diff options
author | pwielebs <piotr.wielebski@nokia.com> | 2019-05-16 17:44:45 +0200 |
---|---|---|
committer | pwielebs <piotr.wielebski@nokia.com> | 2019-05-22 14:01:54 +0200 |
commit | 2cf649dda43c7fc7650b5d0047ccc57108918724 (patch) | |
tree | 03d07378786376e077f7d95a6a98c4f66ab85719 /prh-app-server/src/main/java/org | |
parent | a4f457e46a336a30ceea69a742e8b8aa8f2e720f (diff) |
Align PRH to El Alto SDK
Change-Id: I65c445d76092e11084fb60c68740e1321b35708c
Issue-ID: DCAEGEN2-1501
Signed-off-by: pwielebs <piotr.wielebski@nokia.com>
Diffstat (limited to 'prh-app-server/src/main/java/org')
14 files changed, 203 insertions, 229 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java index 3ff81e1f..889dae20 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java @@ -25,8 +25,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientC import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,9 +50,9 @@ import java.util.Optional; public class CbsConfiguration extends PrhAppConfig { private static final Logger LOGGER = LoggerFactory.getLogger(CbsConfiguration.class); private AaiClientConfiguration aaiClientCBSConfiguration; - private DmaapPublisherConfiguration dmaapPublisherCBSConfiguration; - private DmaapConsumerConfiguration dmaapConsumerCBSConfiguration; - private DmaapPublisherConfiguration dmaapUpdatePublisherCBSConfiguration; + private MessageRouterPublishRequest messageRouterCBSPublishRequest; + private MessageRouterSubscribeRequest messageRouterCBSSubscribeRequest; + private MessageRouterPublishRequest messageRouterCBSUpdatePublishRequest; @Autowired private ConsulConfigFileReader consulConfigFileReader; @@ -82,10 +82,10 @@ public class CbsConfiguration extends PrhAppConfig { private void parseCBSConfig(JsonObject jsonObject) { LOGGER.info("Received application configuration: {}", jsonObject); CbsContentParser consulConfigurationParser = new CbsContentParser(jsonObject); - dmaapPublisherCBSConfiguration = consulConfigurationParser.getDmaapPublisherConfig(); - dmaapUpdatePublisherCBSConfiguration = consulConfigurationParser.getDmaapUpdatePublisherConfig(); + messageRouterCBSPublishRequest = consulConfigurationParser.getMessageRouterPublishRequest(); + messageRouterCBSUpdatePublishRequest = consulConfigurationParser.getMessageRouterUpdatePublishRequest(); aaiClientCBSConfiguration = consulConfigurationParser.getAaiClientConfig(); - dmaapConsumerCBSConfiguration = consulConfigurationParser.getDmaapConsumerConfig(); + messageRouterCBSSubscribeRequest = consulConfigurationParser.getMessageRouterSubscribeRequest(); } private void parsingConfigError(Throwable throwable) { @@ -97,13 +97,13 @@ public class CbsConfiguration extends PrhAppConfig { } @Override - public DmaapPublisherConfiguration getDmaapPublisherConfiguration() { - return Optional.ofNullable(dmaapPublisherCBSConfiguration).orElse(super.getDmaapPublisherConfiguration()); + public MessageRouterPublishRequest getMessageRouterPublishRequest() { + return Optional.ofNullable(messageRouterCBSPublishRequest).orElse(super.getMessageRouterPublishRequest()); } @Override - public DmaapPublisherConfiguration getDmaapUpdatePublisherConfiguration() { - return Optional.ofNullable(dmaapUpdatePublisherCBSConfiguration).orElse(super.getDmaapPublisherConfiguration()); + public MessageRouterPublishRequest getMessageRouterUpdatePublishRequest() { + return Optional.ofNullable(messageRouterCBSUpdatePublishRequest).orElse(super.getMessageRouterUpdatePublishRequest()); } @Override @@ -112,7 +112,7 @@ public class CbsConfiguration extends PrhAppConfig { } @Override - public DmaapConsumerConfiguration getDmaapConsumerConfiguration() { - return Optional.ofNullable(dmaapConsumerCBSConfiguration).orElse(super.getDmaapConsumerConfiguration()); + public MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() { + return Optional.ofNullable(messageRouterCBSSubscribeRequest).orElse(super.getMessageRouterSubscribeRequest()); } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsContentParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsContentParser.java index f19eb3e6..51d86399 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsContentParser.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsContentParser.java @@ -30,11 +30,12 @@ import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientC import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.ImmutableAaiClientConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; +import java.time.Duration; import java.util.Map; import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamWithName; @@ -47,7 +48,6 @@ class CbsContentParser { private static final String SECURITY_TRUST_STORE_PASS_PATH = "security.trustStorePasswordPath"; private static final String SECURITY_KEY_STORE_PATH = "security.keyStorePath"; private static final String SECURITY_KEY_STORE_PASS_PATH = "security.keyStorePasswordPath"; - private static final String SECURITY_ENABLE_DMAAP_CERT_AUTH = "security.enableDmaapCertAuth"; private static final String CONFIG = "config"; private static final String PNF_UPDATE = "pnf-update"; private static final String PNF_READY = "pnf-ready"; @@ -59,48 +59,24 @@ class CbsContentParser { this.jsonObject = jsonObject.getAsJsonObject(CONFIG); } - DmaapPublisherConfiguration getDmaapPublisherConfig() { + MessageRouterPublishRequest getMessageRouterPublishRequest() { RawDataStream<JsonObject> sink = DataStreams.namedSinks(jsonObject).find(streamWithName(PNF_READY)).get(); MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink); - return new ImmutableDmaapPublisherConfiguration.Builder() - .endpointUrl(parsedSink.topicUrl()) - .dmaapTopicName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapTopicName").getAsString()) - .dmaapUserPassword(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserPassword").getAsString()) - .dmaapPortNumber(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapPortNumber").getAsInt()) - .dmaapProtocol(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapProtocol").getAsString()) - .dmaapContentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString()) - .dmaapHostName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapHostName").getAsString()) - .dmaapUserName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserName").getAsString()) - .dmaapUserPassword(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserPassword").getAsString()) - .trustStorePath(jsonObject.get(SECURITY_TRUST_STORE_PATH).getAsString()) - .trustStorePasswordPath(jsonObject.get(SECURITY_TRUST_STORE_PASS_PATH).getAsString()) - .keyStorePath(jsonObject.get(SECURITY_KEY_STORE_PATH).getAsString()) - .keyStorePasswordPath(jsonObject.get(SECURITY_KEY_STORE_PASS_PATH).getAsString()) - .enableDmaapCertAuth(jsonObject.get(SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) - .build(); + return ImmutableMessageRouterPublishRequest.builder() + .contentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString()) + .sinkDefinition(parsedSink) + .build(); } - DmaapPublisherConfiguration getDmaapUpdatePublisherConfig() { + MessageRouterPublishRequest getMessageRouterUpdatePublishRequest() { RawDataStream<JsonObject> sink = DataStreams.namedSinks(jsonObject).find(streamWithName(PNF_UPDATE)).get(); MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink); - return new ImmutableDmaapPublisherConfiguration.Builder() - .endpointUrl(parsedSink.topicUrl()) - .dmaapTopicName(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapTopicName").getAsString()) - .dmaapUserPassword(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapUserPassword").getAsString()) - .dmaapPortNumber(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapPortNumber").getAsInt()) - .dmaapProtocol(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapProtocol").getAsString()) - .dmaapContentType(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapContentType").getAsString()) - .dmaapHostName(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapHostName").getAsString()) - .dmaapUserName(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapUserName").getAsString()) - .dmaapUserPassword(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapUserPassword").getAsString()) - .trustStorePath(jsonObject.get(SECURITY_TRUST_STORE_PATH).getAsString()) - .trustStorePasswordPath(jsonObject.get(SECURITY_TRUST_STORE_PASS_PATH).getAsString()) - .keyStorePath(jsonObject.get(SECURITY_KEY_STORE_PATH).getAsString()) - .keyStorePasswordPath(jsonObject.get(SECURITY_KEY_STORE_PASS_PATH).getAsString()) - .enableDmaapCertAuth(jsonObject.get(SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) - .build(); + return ImmutableMessageRouterPublishRequest.builder() + .contentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString()) + .sinkDefinition(parsedSink) + .build(); } AaiClientConfiguration getAaiClientConfig() { @@ -126,28 +102,15 @@ class CbsContentParser { .build(); } - DmaapConsumerConfiguration getDmaapConsumerConfig() { + MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() { RawDataStream<JsonObject> source = DataStreams.namedSources(jsonObject).find(streamWithName(VES_REG_OUTPUT)).get(); MessageRouterSource parsedSource = StreamFromGsonParsers.messageRouterSourceParser().unsafeParse(source); - return new ImmutableDmaapConsumerConfiguration.Builder() - .endpointUrl(parsedSource.topicUrl()) - .timeoutMs(jsonObject.get("dmaap.dmaapConsumerConfiguration.timeoutMs").getAsInt()) - .dmaapHostName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapHostName").getAsString()) - .dmaapUserName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserName").getAsString()) - .dmaapUserPassword(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserPassword").getAsString()) - .dmaapTopicName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapTopicName").getAsString()) - .dmaapPortNumber(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapPortNumber").getAsInt()) - .dmaapContentType(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapContentType").getAsString()) - .messageLimit(jsonObject.get("dmaap.dmaapConsumerConfiguration.messageLimit").getAsInt()) - .dmaapProtocol(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapProtocol").getAsString()) - .consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString()) - .consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString()) - .trustStorePath(jsonObject.get(SECURITY_TRUST_STORE_PATH).getAsString()) - .trustStorePasswordPath(jsonObject.get(SECURITY_TRUST_STORE_PASS_PATH).getAsString()) - .keyStorePath(jsonObject.get(SECURITY_KEY_STORE_PATH).getAsString()) - .keyStorePasswordPath(jsonObject.get(SECURITY_KEY_STORE_PASS_PATH).getAsString()) - .enableDmaapCertAuth(jsonObject.get(SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) - .build(); + return ImmutableMessageRouterSubscribeRequest.builder() + .consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString()) + .sourceDefinition(parsedSource) + .consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString()) + .timeout(Duration.ofMillis(jsonObject.get("dmaap.dmaapConsumerConfiguration.timeoutMs").getAsLong())) + .build(); } }
\ No newline at end of file diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/Config.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/Config.java index 6363356f..7b87415b 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/Config.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/Config.java @@ -23,6 +23,8 @@ package org.onap.dcaegen2.services.prh.configuration; import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.springframework.core.io.Resource; /** @@ -32,12 +34,12 @@ public interface Config { Resource getGitInfo(); - DmaapConsumerConfiguration getDmaapConsumerConfiguration(); + MessageRouterSubscribeRequest getMessageRouterSubscribeRequest(); AaiClientConfiguration getAaiClientConfiguration(); - DmaapPublisherConfiguration getDmaapPublisherConfiguration(); + MessageRouterPublishRequest getMessageRouterPublishRequest(); - DmaapPublisherConfiguration getDmaapUpdatePublisherConfiguration(); + MessageRouterPublishRequest getMessageRouterUpdatePublishRequest(); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/DmaapPublisherTaskConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/DmaapPublisherTaskConfig.java index 7355cf48..f18f1d90 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/DmaapPublisherTaskConfig.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/DmaapPublisherTaskConfig.java @@ -22,6 +22,7 @@ package org.onap.dcaegen2.services.prh.configuration; import org.onap.dcaegen2.services.prh.tasks.DmaapPublisherTask; import org.onap.dcaegen2.services.prh.tasks.DmaapPublisherTaskImpl; +import org.onap.dcaegen2.services.prh.tasks.MessageRouterPublisherResolver; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -31,12 +32,12 @@ public class DmaapPublisherTaskConfig { @Bean(name = "ReadyPublisherTask") @Autowired public DmaapPublisherTask getReadyPublisherTask(final Config config) { - return new DmaapPublisherTaskImpl(config::getDmaapPublisherConfiguration); + return new DmaapPublisherTaskImpl(config::getMessageRouterPublishRequest, new MessageRouterPublisherResolver()); } @Bean(name = "UpdatePublisherTask") @Autowired public DmaapPublisherTask getUpdatePublisherTask(final Config config) { - return new DmaapPublisherTaskImpl(config::getDmaapUpdatePublisherConfiguration); + return new DmaapPublisherTaskImpl(config::getMessageRouterUpdatePublishRequest, new MessageRouterPublisherResolver()); } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java index 4b48fa30..01ef2063 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java @@ -21,8 +21,8 @@ package org.onap.dcaegen2.services.prh.configuration; import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; @@ -47,11 +47,11 @@ public abstract class PrhAppConfig implements Config { AaiClientConfiguration aaiClientConfiguration; - DmaapConsumerConfiguration dmaapConsumerConfiguration; + MessageRouterSubscribeRequest messageRouterSubscribeRequest; - DmaapPublisherConfiguration dmaapPublisherConfiguration; + MessageRouterPublishRequest messageRouterPublishRequest; - DmaapPublisherConfiguration dmaapUpdatePublisherConfiguration; + MessageRouterPublishRequest messageRouterUpdatePublishRequest; @Value("classpath:git_info.json") private Resource gitInfo; @@ -67,8 +67,8 @@ public abstract class PrhAppConfig implements Config { } @Override - public DmaapConsumerConfiguration getDmaapConsumerConfiguration() { - return dmaapConsumerConfiguration; + public MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() { + return messageRouterSubscribeRequest; } @Override @@ -77,12 +77,12 @@ public abstract class PrhAppConfig implements Config { } @Override - public DmaapPublisherConfiguration getDmaapPublisherConfiguration() { - return dmaapPublisherConfiguration; + public MessageRouterPublishRequest getMessageRouterPublishRequest() { + return messageRouterPublishRequest; } @Override - public DmaapPublisherConfiguration getDmaapUpdatePublisherConfiguration() { - return dmaapUpdatePublisherConfiguration; + public MessageRouterPublishRequest getMessageRouterUpdatePublishRequest() { + return messageRouterUpdatePublishRequest; } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java index 43d6922a..b3d84562 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java @@ -20,6 +20,23 @@ package org.onap.dcaegen2.services.prh.service; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; +import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; +import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.StringUtils; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.Optional; +import java.util.stream.StreamSupport; + import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.ADDITIONAL_FIELDS; import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_EVENT_HEADER; import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_FORMAT_FOR_JSON_OBJECT; @@ -37,21 +54,6 @@ import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SERIA import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SOURCE_NAME; import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SW_VERSION; -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import java.util.Optional; -import java.util.stream.StreamSupport; -import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; -import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; -import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.util.StringUtils; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18 @@ -77,34 +79,21 @@ public class DmaapConsumerJsonParser { * @param monoMessage - results from DMaaP * @return reactive DMaaPModel */ - public Flux<ConsumerDmaapModel> getJsonObject(Mono<JsonElement> monoMessage) { - return monoMessage - .flatMapMany(this::getConsumerDmaapModelFromJsonArray); + public Flux<ConsumerDmaapModel> getJsonObject(Mono<MessageRouterSubscribeResponse> monoMessage) { + return monoMessage.flatMapMany(msgRouterResponse -> getConsumerDmaapModelFromJsonArray(msgRouterResponse.items())); } - private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonElement jsonElement) { - LOGGER.debug("DmaapConsumerJsonParser input for parsing: {}", jsonElement); - - if (jsonElement instanceof JsonObject) { - LOGGER.debug("Element is JsonObject"); - return create(Flux.just((JsonObject) jsonElement)); - } + private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonArray items) { + LOGGER.debug("DmaapConsumerJsonParser input for parsing: {}", items); - if (jsonElement instanceof JsonArray) { - LOGGER.debug("Element is JsonArray"); - JsonArray jsonArray = (JsonArray) jsonElement; - if (jsonArray.size() == 0) { - LOGGER.debug("Nothing to consume from DMaaP"); - return Flux.empty(); - } - return create( - Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonArray.spliterator(), false) - .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) - .orElseGet(JsonObject::new))))); + if (items.size() == 0) { + LOGGER.debug("Nothing to consume from DMaaP"); + return Flux.empty(); } - - LOGGER.warn("Element is neither JSON Object or Array"); - return Flux.empty(); + return create( + Flux.defer(() -> Flux.fromStream(StreamSupport.stream(items.spliterator(), false) + .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) + .orElseGet(JsonObject::new))))); } Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java index 02691446..0b26890d 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java @@ -32,21 +32,14 @@ import org.onap.dcaegen2.services.prh.model.bbs.ImmutableRelationshipWrapper; import org.onap.dcaegen2.services.prh.model.bbs.RelationshipWrapper; import org.onap.dcaegen2.services.prh.model.utils.GsonSerializer; import org.onap.dcaegen2.services.prh.model.utils.HttpUtils; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RequestBody; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.*; import org.onap.dcaegen2.services.sdk.rest.services.uri.URI.URIBuilder; -import org.onap.dcaegen2.services.sdk.security.ssl.SslFactory; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import reactor.core.publisher.Mono; - - import java.util.Arrays; import java.util.List; import java.util.function.Function; @@ -66,7 +59,7 @@ public class BbsActionsTaskImpl implements BbsActionsTask { @Autowired BbsActionsTaskImpl(Config config) { - this(config, RxHttpClient.create(new SslFactory().createInsecureClientContext())); + this(config, RxHttpClientFactory.createInsecure()); } BbsActionsTaskImpl(Config config, RxHttpClient httpClient) { diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java index 3a630a40..5fc41d93 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java @@ -20,20 +20,15 @@ package org.onap.dcaegen2.services.prh.tasks; -import javax.net.ssl.SSLException; - import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; - import reactor.core.publisher.Flux; +import javax.net.ssl.SSLException; + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 */ interface DmaapConsumerTask { - Flux<ConsumerDmaapModel> execute(String object) throws SSLException; - - DMaaPConsumerReactiveHttpClient resolveClient() throws SSLException; } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java index d3086cbe..f46e2cc9 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java @@ -20,15 +20,11 @@ package org.onap.dcaegen2.services.prh.tasks; -import com.google.gson.JsonElement; -import java.util.Optional; -import javax.net.ssl.SSLException; import org.onap.dcaegen2.services.prh.configuration.Config; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -46,33 +42,26 @@ public class DmaapConsumerTaskImpl implements DmaapConsumerTask { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class); private final Config config; private final DmaapConsumerJsonParser dmaapConsumerJsonParser; - private final ConsumerReactiveHttpClientFactory httpClientFactory; + @Autowired public DmaapConsumerTaskImpl(Config config) { - this(config, new DmaapConsumerJsonParser(), - new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory())); + this(config, new DmaapConsumerJsonParser()); } - DmaapConsumerTaskImpl(Config prhAppConfig, - DmaapConsumerJsonParser dmaapConsumerJsonParser, - ConsumerReactiveHttpClientFactory httpClientFactory) { + DmaapConsumerTaskImpl(Config prhAppConfig, DmaapConsumerJsonParser dmaapConsumerJsonParser) { this.config = prhAppConfig; this.dmaapConsumerJsonParser = dmaapConsumerJsonParser; - this.httpClientFactory = httpClientFactory; } @Override - public Flux<ConsumerDmaapModel> execute(String object) throws SSLException { - DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient(); + public Flux<ConsumerDmaapModel> execute(String object) { + MessageRouterSubscriber messageRouterSubscriberClient = + new MessageRouterSubscriberResolver().resolveClient(); LOGGER.debug("Method called with arg {}", object); - Mono<JsonElement> response = dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse( - Optional.empty()); + Mono<MessageRouterSubscribeResponse> response = messageRouterSubscriberClient + .get(config.getMessageRouterSubscribeRequest()); return dmaapConsumerJsonParser.getJsonObject(response); } - @Override - public DMaaPConsumerReactiveHttpClient resolveClient() throws SSLException { - return httpClientFactory.create(config.getDmaapConsumerConfiguration()); - } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java index 7fc596c1..f63f4d76 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java @@ -20,10 +20,10 @@ package org.onap.dcaegen2.services.prh.tasks; -import javax.net.ssl.SSLException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -31,16 +31,8 @@ import reactor.core.publisher.Mono; */ public interface DmaapPublisherTask { - /** - * - * Does not work reactive version with DMaaP MR - to be investigated why in future - * As WA plesae use Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel); - * */ - @Deprecated - Mono<org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse> - execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException, SSLException; - Mono<org.apache.http.HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel); + Flux<MessageRouterPublishResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException; - DMaaPPublisherReactiveHttpClient resolveClient() throws SSLException;; + Mono<org.apache.http.HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java index 55a8bb58..3a724884 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java @@ -20,6 +20,7 @@ package org.onap.dcaegen2.services.prh.tasks; +import com.google.gson.JsonPrimitive; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; @@ -27,17 +28,14 @@ import org.apache.http.impl.client.DefaultHttpClient; import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.PnfReadyJsonBodyBuilderImpl; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory; -import org.onap.dcaegen2.services.sdk.rest.services.uri.URI; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import javax.net.ssl.SSLException; -import java.util.Optional; import java.util.function.Supplier; /** @@ -46,70 +44,52 @@ import java.util.function.Supplier; public class DmaapPublisherTaskImpl implements DmaapPublisherTask { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class); - private final Supplier<DmaapPublisherConfiguration> config; + + private final Supplier<MessageRouterPublishRequest> config; + private final MessageRouterPublisherResolver messageRouterPublisherClientResolver; private final PnfReadyJsonBodyBuilderImpl pnfReadyJsonBodyBuilder = new PnfReadyJsonBodyBuilderImpl(); - private final PublisherReactiveHttpClientFactory httpClientFactory; - public DmaapPublisherTaskImpl(final Supplier<DmaapPublisherConfiguration> config) { - this(config, new PublisherReactiveHttpClientFactory( - new DmaaPRestTemplateFactory(), - new PnfReadyJsonBodyBuilderImpl())); - } - DmaapPublisherTaskImpl( - Supplier<DmaapPublisherConfiguration> config, - PublisherReactiveHttpClientFactory httpClientFactory) { + public DmaapPublisherTaskImpl(Supplier<MessageRouterPublishRequest> config, MessageRouterPublisherResolver messageRouterPublisherClientResolver) { this.config = config; - this.httpClientFactory = httpClientFactory; + this.messageRouterPublisherClientResolver = messageRouterPublisherClientResolver; } @Override - public Mono<org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse> - execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException, SSLException { + public Flux<MessageRouterPublishResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException { if (consumerDmaapModel == null) { throw new DmaapNotFoundException("Invoked null object to DMaaP task"); } - DMaaPPublisherReactiveHttpClient dmaapPublisherReactiveHttpClient = resolveClient(); + MessageRouterPublisher messageRouterPublisher = messageRouterPublisherClientResolver.resolveClient(); LOGGER.info("Method called with arg {}", consumerDmaapModel); - return dmaapPublisherReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel, Optional.empty()); - } - - @Override - public DMaaPPublisherReactiveHttpClient resolveClient() throws SSLException { - return httpClientFactory.create(config.get()); - + String json = pnfReadyJsonBodyBuilder.createJsonBody(consumerDmaapModel); + return messageRouterPublisher.put( + config.get(), + Flux.just(json).map(JsonPrimitive::new)); } /** * * Does not work reactive version with DMaaP MR - to be investigated why in future - * As WA plesae use Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel); + * As WA please use Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel); * */ @Override public Mono<org.apache.http.HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel) { String json = pnfReadyJsonBodyBuilder.createJsonBody(consumerDmaapModel); - DefaultHttpClient httpClient = new DefaultHttpClient(); - HttpPost postRequest = new HttpPost(getUrl()); - try { - StringEntity input = new StringEntity(json); - input.setContentType(config.get().dmaapContentType()); - postRequest.setEntity(input); - HttpResponse response = httpClient.execute(postRequest); - return Mono.just(response); - } catch (Exception e) { - LOGGER.warn("Publishing to DMaaP MR failed: {}", e); - return Mono.error(e); + try (DefaultHttpClient httpClient = new DefaultHttpClient()) { + HttpPost postRequest = new HttpPost(config.get().sinkDefinition().topicUrl()); + try { + StringEntity input = new StringEntity(json); + input.setContentType(config.get().contentType()); + postRequest.setEntity(input); + HttpResponse response = httpClient.execute(postRequest); + return Mono.just(response); + } catch (Exception e) { + LOGGER.warn("Publishing to DMaaP MR failed: {}", e); + return Mono.error(e); + } } } - private String getUrl() { - return (new URI.URIBuilder()).scheme(config.get().dmaapProtocol()) - .host(config.get().dmaapHostName()) - .port(config.get().dmaapPortNumber()).path(this.createRequestPath()).build() - .toString(); - } - private String createRequestPath() { - return "/" + config.get().dmaapTopicName(); - } }
\ No newline at end of file diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterPublisherResolver.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterPublisherResolver.java new file mode 100644 index 00000000..2f4e3867 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterPublisherResolver.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.tasks; + +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; +import org.springframework.stereotype.Component; + +@Component +public class MessageRouterPublisherResolver { + + public MessageRouterPublisher resolveClient() { + return DmaapClientFactory.createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); + } +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterSubscriberResolver.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterSubscriberResolver.java new file mode 100644 index 00000000..63930ef7 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterSubscriberResolver.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.tasks; + +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; +import org.springframework.stereotype.Component; + +@Component +public class MessageRouterSubscriberResolver { + + public MessageRouterSubscriber resolveClient() { + return DmaapClientFactory.createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); + } +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java index 72ec0cac..4b3436e5 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java @@ -25,6 +25,7 @@ import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.logging.MdcVariables; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.slf4j.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; @@ -50,6 +51,7 @@ public class ScheduledTasks { private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTasks.class); private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); + private final DmaapConsumerTask dmaapConsumerTask; private final DmaapPublisherTask dmaapReadyProducerTask; private final DmaapPublisherTask dmaapUpdateProducerTask; @@ -208,7 +210,7 @@ public class ScheduledTasks { * Marked as deprecated due to problems with DMaaP MR, to be fixed in future */ @Deprecated - private Mono<org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse> + private Flux<MessageRouterPublishResponse> publishToDmaapConfiguration(final State state) { try { if (state.ActivationStatus) { @@ -217,8 +219,8 @@ public class ScheduledTasks { } return dmaapReadyProducerTask.execute(state.DmaapModel); - } catch (PrhTaskException | SSLException e) { - return Mono.error(e); + } catch (PrhTaskException e) { + return Flux.error(e); } } |