diff options
author | pwielebs <piotr.wielebski@nokia.com> | 2019-05-16 17:44:45 +0200 |
---|---|---|
committer | pwielebs <piotr.wielebski@nokia.com> | 2019-05-22 14:01:54 +0200 |
commit | 2cf649dda43c7fc7650b5d0047ccc57108918724 (patch) | |
tree | 03d07378786376e077f7d95a6a98c4f66ab85719 /prh-app-server/src | |
parent | a4f457e46a336a30ceea69a742e8b8aa8f2e720f (diff) |
Align PRH to El Alto SDK
Change-Id: I65c445d76092e11084fb60c68740e1321b35708c
Issue-ID: DCAEGEN2-1501
Signed-off-by: pwielebs <piotr.wielebski@nokia.com>
Diffstat (limited to 'prh-app-server/src')
22 files changed, 360 insertions, 585 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java index 3ff81e1f..889dae20 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java @@ -25,8 +25,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientC import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,9 +50,9 @@ import java.util.Optional; public class CbsConfiguration extends PrhAppConfig { private static final Logger LOGGER = LoggerFactory.getLogger(CbsConfiguration.class); private AaiClientConfiguration aaiClientCBSConfiguration; - private DmaapPublisherConfiguration dmaapPublisherCBSConfiguration; - private DmaapConsumerConfiguration dmaapConsumerCBSConfiguration; - private DmaapPublisherConfiguration dmaapUpdatePublisherCBSConfiguration; + private MessageRouterPublishRequest messageRouterCBSPublishRequest; + private MessageRouterSubscribeRequest messageRouterCBSSubscribeRequest; + private MessageRouterPublishRequest messageRouterCBSUpdatePublishRequest; @Autowired private ConsulConfigFileReader consulConfigFileReader; @@ -82,10 +82,10 @@ public class CbsConfiguration extends PrhAppConfig { private void parseCBSConfig(JsonObject jsonObject) { LOGGER.info("Received application configuration: {}", jsonObject); CbsContentParser consulConfigurationParser = new CbsContentParser(jsonObject); - dmaapPublisherCBSConfiguration = consulConfigurationParser.getDmaapPublisherConfig(); - dmaapUpdatePublisherCBSConfiguration = consulConfigurationParser.getDmaapUpdatePublisherConfig(); + messageRouterCBSPublishRequest = consulConfigurationParser.getMessageRouterPublishRequest(); + messageRouterCBSUpdatePublishRequest = consulConfigurationParser.getMessageRouterUpdatePublishRequest(); aaiClientCBSConfiguration = consulConfigurationParser.getAaiClientConfig(); - dmaapConsumerCBSConfiguration = consulConfigurationParser.getDmaapConsumerConfig(); + messageRouterCBSSubscribeRequest = consulConfigurationParser.getMessageRouterSubscribeRequest(); } private void parsingConfigError(Throwable throwable) { @@ -97,13 +97,13 @@ public class CbsConfiguration extends PrhAppConfig { } @Override - public DmaapPublisherConfiguration getDmaapPublisherConfiguration() { - return Optional.ofNullable(dmaapPublisherCBSConfiguration).orElse(super.getDmaapPublisherConfiguration()); + public MessageRouterPublishRequest getMessageRouterPublishRequest() { + return Optional.ofNullable(messageRouterCBSPublishRequest).orElse(super.getMessageRouterPublishRequest()); } @Override - public DmaapPublisherConfiguration getDmaapUpdatePublisherConfiguration() { - return Optional.ofNullable(dmaapUpdatePublisherCBSConfiguration).orElse(super.getDmaapPublisherConfiguration()); + public MessageRouterPublishRequest getMessageRouterUpdatePublishRequest() { + return Optional.ofNullable(messageRouterCBSUpdatePublishRequest).orElse(super.getMessageRouterUpdatePublishRequest()); } @Override @@ -112,7 +112,7 @@ public class CbsConfiguration extends PrhAppConfig { } @Override - public DmaapConsumerConfiguration getDmaapConsumerConfiguration() { - return Optional.ofNullable(dmaapConsumerCBSConfiguration).orElse(super.getDmaapConsumerConfiguration()); + public MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() { + return Optional.ofNullable(messageRouterCBSSubscribeRequest).orElse(super.getMessageRouterSubscribeRequest()); } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsContentParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsContentParser.java index f19eb3e6..51d86399 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsContentParser.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsContentParser.java @@ -30,11 +30,12 @@ import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientC import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.ImmutableAaiClientConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; +import java.time.Duration; import java.util.Map; import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamWithName; @@ -47,7 +48,6 @@ class CbsContentParser { private static final String SECURITY_TRUST_STORE_PASS_PATH = "security.trustStorePasswordPath"; private static final String SECURITY_KEY_STORE_PATH = "security.keyStorePath"; private static final String SECURITY_KEY_STORE_PASS_PATH = "security.keyStorePasswordPath"; - private static final String SECURITY_ENABLE_DMAAP_CERT_AUTH = "security.enableDmaapCertAuth"; private static final String CONFIG = "config"; private static final String PNF_UPDATE = "pnf-update"; private static final String PNF_READY = "pnf-ready"; @@ -59,48 +59,24 @@ class CbsContentParser { this.jsonObject = jsonObject.getAsJsonObject(CONFIG); } - DmaapPublisherConfiguration getDmaapPublisherConfig() { + MessageRouterPublishRequest getMessageRouterPublishRequest() { RawDataStream<JsonObject> sink = DataStreams.namedSinks(jsonObject).find(streamWithName(PNF_READY)).get(); MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink); - return new ImmutableDmaapPublisherConfiguration.Builder() - .endpointUrl(parsedSink.topicUrl()) - .dmaapTopicName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapTopicName").getAsString()) - .dmaapUserPassword(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserPassword").getAsString()) - .dmaapPortNumber(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapPortNumber").getAsInt()) - .dmaapProtocol(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapProtocol").getAsString()) - .dmaapContentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString()) - .dmaapHostName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapHostName").getAsString()) - .dmaapUserName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserName").getAsString()) - .dmaapUserPassword(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserPassword").getAsString()) - .trustStorePath(jsonObject.get(SECURITY_TRUST_STORE_PATH).getAsString()) - .trustStorePasswordPath(jsonObject.get(SECURITY_TRUST_STORE_PASS_PATH).getAsString()) - .keyStorePath(jsonObject.get(SECURITY_KEY_STORE_PATH).getAsString()) - .keyStorePasswordPath(jsonObject.get(SECURITY_KEY_STORE_PASS_PATH).getAsString()) - .enableDmaapCertAuth(jsonObject.get(SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) - .build(); + return ImmutableMessageRouterPublishRequest.builder() + .contentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString()) + .sinkDefinition(parsedSink) + .build(); } - DmaapPublisherConfiguration getDmaapUpdatePublisherConfig() { + MessageRouterPublishRequest getMessageRouterUpdatePublishRequest() { RawDataStream<JsonObject> sink = DataStreams.namedSinks(jsonObject).find(streamWithName(PNF_UPDATE)).get(); MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink); - return new ImmutableDmaapPublisherConfiguration.Builder() - .endpointUrl(parsedSink.topicUrl()) - .dmaapTopicName(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapTopicName").getAsString()) - .dmaapUserPassword(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapUserPassword").getAsString()) - .dmaapPortNumber(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapPortNumber").getAsInt()) - .dmaapProtocol(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapProtocol").getAsString()) - .dmaapContentType(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapContentType").getAsString()) - .dmaapHostName(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapHostName").getAsString()) - .dmaapUserName(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapUserName").getAsString()) - .dmaapUserPassword(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapUserPassword").getAsString()) - .trustStorePath(jsonObject.get(SECURITY_TRUST_STORE_PATH).getAsString()) - .trustStorePasswordPath(jsonObject.get(SECURITY_TRUST_STORE_PASS_PATH).getAsString()) - .keyStorePath(jsonObject.get(SECURITY_KEY_STORE_PATH).getAsString()) - .keyStorePasswordPath(jsonObject.get(SECURITY_KEY_STORE_PASS_PATH).getAsString()) - .enableDmaapCertAuth(jsonObject.get(SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) - .build(); + return ImmutableMessageRouterPublishRequest.builder() + .contentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString()) + .sinkDefinition(parsedSink) + .build(); } AaiClientConfiguration getAaiClientConfig() { @@ -126,28 +102,15 @@ class CbsContentParser { .build(); } - DmaapConsumerConfiguration getDmaapConsumerConfig() { + MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() { RawDataStream<JsonObject> source = DataStreams.namedSources(jsonObject).find(streamWithName(VES_REG_OUTPUT)).get(); MessageRouterSource parsedSource = StreamFromGsonParsers.messageRouterSourceParser().unsafeParse(source); - return new ImmutableDmaapConsumerConfiguration.Builder() - .endpointUrl(parsedSource.topicUrl()) - .timeoutMs(jsonObject.get("dmaap.dmaapConsumerConfiguration.timeoutMs").getAsInt()) - .dmaapHostName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapHostName").getAsString()) - .dmaapUserName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserName").getAsString()) - .dmaapUserPassword(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserPassword").getAsString()) - .dmaapTopicName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapTopicName").getAsString()) - .dmaapPortNumber(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapPortNumber").getAsInt()) - .dmaapContentType(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapContentType").getAsString()) - .messageLimit(jsonObject.get("dmaap.dmaapConsumerConfiguration.messageLimit").getAsInt()) - .dmaapProtocol(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapProtocol").getAsString()) - .consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString()) - .consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString()) - .trustStorePath(jsonObject.get(SECURITY_TRUST_STORE_PATH).getAsString()) - .trustStorePasswordPath(jsonObject.get(SECURITY_TRUST_STORE_PASS_PATH).getAsString()) - .keyStorePath(jsonObject.get(SECURITY_KEY_STORE_PATH).getAsString()) - .keyStorePasswordPath(jsonObject.get(SECURITY_KEY_STORE_PASS_PATH).getAsString()) - .enableDmaapCertAuth(jsonObject.get(SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) - .build(); + return ImmutableMessageRouterSubscribeRequest.builder() + .consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString()) + .sourceDefinition(parsedSource) + .consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString()) + .timeout(Duration.ofMillis(jsonObject.get("dmaap.dmaapConsumerConfiguration.timeoutMs").getAsLong())) + .build(); } }
\ No newline at end of file diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/Config.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/Config.java index 6363356f..7b87415b 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/Config.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/Config.java @@ -23,6 +23,8 @@ package org.onap.dcaegen2.services.prh.configuration; import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.springframework.core.io.Resource; /** @@ -32,12 +34,12 @@ public interface Config { Resource getGitInfo(); - DmaapConsumerConfiguration getDmaapConsumerConfiguration(); + MessageRouterSubscribeRequest getMessageRouterSubscribeRequest(); AaiClientConfiguration getAaiClientConfiguration(); - DmaapPublisherConfiguration getDmaapPublisherConfiguration(); + MessageRouterPublishRequest getMessageRouterPublishRequest(); - DmaapPublisherConfiguration getDmaapUpdatePublisherConfiguration(); + MessageRouterPublishRequest getMessageRouterUpdatePublishRequest(); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/DmaapPublisherTaskConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/DmaapPublisherTaskConfig.java index 7355cf48..f18f1d90 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/DmaapPublisherTaskConfig.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/DmaapPublisherTaskConfig.java @@ -22,6 +22,7 @@ package org.onap.dcaegen2.services.prh.configuration; import org.onap.dcaegen2.services.prh.tasks.DmaapPublisherTask; import org.onap.dcaegen2.services.prh.tasks.DmaapPublisherTaskImpl; +import org.onap.dcaegen2.services.prh.tasks.MessageRouterPublisherResolver; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -31,12 +32,12 @@ public class DmaapPublisherTaskConfig { @Bean(name = "ReadyPublisherTask") @Autowired public DmaapPublisherTask getReadyPublisherTask(final Config config) { - return new DmaapPublisherTaskImpl(config::getDmaapPublisherConfiguration); + return new DmaapPublisherTaskImpl(config::getMessageRouterPublishRequest, new MessageRouterPublisherResolver()); } @Bean(name = "UpdatePublisherTask") @Autowired public DmaapPublisherTask getUpdatePublisherTask(final Config config) { - return new DmaapPublisherTaskImpl(config::getDmaapUpdatePublisherConfiguration); + return new DmaapPublisherTaskImpl(config::getMessageRouterUpdatePublishRequest, new MessageRouterPublisherResolver()); } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java index 4b48fa30..01ef2063 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java @@ -21,8 +21,8 @@ package org.onap.dcaegen2.services.prh.configuration; import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; @@ -47,11 +47,11 @@ public abstract class PrhAppConfig implements Config { AaiClientConfiguration aaiClientConfiguration; - DmaapConsumerConfiguration dmaapConsumerConfiguration; + MessageRouterSubscribeRequest messageRouterSubscribeRequest; - DmaapPublisherConfiguration dmaapPublisherConfiguration; + MessageRouterPublishRequest messageRouterPublishRequest; - DmaapPublisherConfiguration dmaapUpdatePublisherConfiguration; + MessageRouterPublishRequest messageRouterUpdatePublishRequest; @Value("classpath:git_info.json") private Resource gitInfo; @@ -67,8 +67,8 @@ public abstract class PrhAppConfig implements Config { } @Override - public DmaapConsumerConfiguration getDmaapConsumerConfiguration() { - return dmaapConsumerConfiguration; + public MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() { + return messageRouterSubscribeRequest; } @Override @@ -77,12 +77,12 @@ public abstract class PrhAppConfig implements Config { } @Override - public DmaapPublisherConfiguration getDmaapPublisherConfiguration() { - return dmaapPublisherConfiguration; + public MessageRouterPublishRequest getMessageRouterPublishRequest() { + return messageRouterPublishRequest; } @Override - public DmaapPublisherConfiguration getDmaapUpdatePublisherConfiguration() { - return dmaapUpdatePublisherConfiguration; + public MessageRouterPublishRequest getMessageRouterUpdatePublishRequest() { + return messageRouterUpdatePublishRequest; } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java index 43d6922a..b3d84562 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java @@ -20,6 +20,23 @@ package org.onap.dcaegen2.services.prh.service; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; +import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; +import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.StringUtils; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.Optional; +import java.util.stream.StreamSupport; + import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.ADDITIONAL_FIELDS; import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_EVENT_HEADER; import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_FORMAT_FOR_JSON_OBJECT; @@ -37,21 +54,6 @@ import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SERIA import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SOURCE_NAME; import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SW_VERSION; -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import java.util.Optional; -import java.util.stream.StreamSupport; -import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; -import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; -import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.util.StringUtils; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18 @@ -77,34 +79,21 @@ public class DmaapConsumerJsonParser { * @param monoMessage - results from DMaaP * @return reactive DMaaPModel */ - public Flux<ConsumerDmaapModel> getJsonObject(Mono<JsonElement> monoMessage) { - return monoMessage - .flatMapMany(this::getConsumerDmaapModelFromJsonArray); + public Flux<ConsumerDmaapModel> getJsonObject(Mono<MessageRouterSubscribeResponse> monoMessage) { + return monoMessage.flatMapMany(msgRouterResponse -> getConsumerDmaapModelFromJsonArray(msgRouterResponse.items())); } - private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonElement jsonElement) { - LOGGER.debug("DmaapConsumerJsonParser input for parsing: {}", jsonElement); - - if (jsonElement instanceof JsonObject) { - LOGGER.debug("Element is JsonObject"); - return create(Flux.just((JsonObject) jsonElement)); - } + private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonArray items) { + LOGGER.debug("DmaapConsumerJsonParser input for parsing: {}", items); - if (jsonElement instanceof JsonArray) { - LOGGER.debug("Element is JsonArray"); - JsonArray jsonArray = (JsonArray) jsonElement; - if (jsonArray.size() == 0) { - LOGGER.debug("Nothing to consume from DMaaP"); - return Flux.empty(); - } - return create( - Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonArray.spliterator(), false) - .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) - .orElseGet(JsonObject::new))))); + if (items.size() == 0) { + LOGGER.debug("Nothing to consume from DMaaP"); + return Flux.empty(); } - - LOGGER.warn("Element is neither JSON Object or Array"); - return Flux.empty(); + return create( + Flux.defer(() -> Flux.fromStream(StreamSupport.stream(items.spliterator(), false) + .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) + .orElseGet(JsonObject::new))))); } Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java index 02691446..0b26890d 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java @@ -32,21 +32,14 @@ import org.onap.dcaegen2.services.prh.model.bbs.ImmutableRelationshipWrapper; import org.onap.dcaegen2.services.prh.model.bbs.RelationshipWrapper; import org.onap.dcaegen2.services.prh.model.utils.GsonSerializer; import org.onap.dcaegen2.services.prh.model.utils.HttpUtils; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RequestBody; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.*; import org.onap.dcaegen2.services.sdk.rest.services.uri.URI.URIBuilder; -import org.onap.dcaegen2.services.sdk.security.ssl.SslFactory; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import reactor.core.publisher.Mono; - - import java.util.Arrays; import java.util.List; import java.util.function.Function; @@ -66,7 +59,7 @@ public class BbsActionsTaskImpl implements BbsActionsTask { @Autowired BbsActionsTaskImpl(Config config) { - this(config, RxHttpClient.create(new SslFactory().createInsecureClientContext())); + this(config, RxHttpClientFactory.createInsecure()); } BbsActionsTaskImpl(Config config, RxHttpClient httpClient) { diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java index 3a630a40..5fc41d93 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java @@ -20,20 +20,15 @@ package org.onap.dcaegen2.services.prh.tasks; -import javax.net.ssl.SSLException; - import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; - import reactor.core.publisher.Flux; +import javax.net.ssl.SSLException; + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 */ interface DmaapConsumerTask { - Flux<ConsumerDmaapModel> execute(String object) throws SSLException; - - DMaaPConsumerReactiveHttpClient resolveClient() throws SSLException; } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java index d3086cbe..f46e2cc9 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java @@ -20,15 +20,11 @@ package org.onap.dcaegen2.services.prh.tasks; -import com.google.gson.JsonElement; -import java.util.Optional; -import javax.net.ssl.SSLException; import org.onap.dcaegen2.services.prh.configuration.Config; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -46,33 +42,26 @@ public class DmaapConsumerTaskImpl implements DmaapConsumerTask { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class); private final Config config; private final DmaapConsumerJsonParser dmaapConsumerJsonParser; - private final ConsumerReactiveHttpClientFactory httpClientFactory; + @Autowired public DmaapConsumerTaskImpl(Config config) { - this(config, new DmaapConsumerJsonParser(), - new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory())); + this(config, new DmaapConsumerJsonParser()); } - DmaapConsumerTaskImpl(Config prhAppConfig, - DmaapConsumerJsonParser dmaapConsumerJsonParser, - ConsumerReactiveHttpClientFactory httpClientFactory) { + DmaapConsumerTaskImpl(Config prhAppConfig, DmaapConsumerJsonParser dmaapConsumerJsonParser) { this.config = prhAppConfig; this.dmaapConsumerJsonParser = dmaapConsumerJsonParser; - this.httpClientFactory = httpClientFactory; } @Override - public Flux<ConsumerDmaapModel> execute(String object) throws SSLException { - DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient(); + public Flux<ConsumerDmaapModel> execute(String object) { + MessageRouterSubscriber messageRouterSubscriberClient = + new MessageRouterSubscriberResolver().resolveClient(); LOGGER.debug("Method called with arg {}", object); - Mono<JsonElement> response = dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse( - Optional.empty()); + Mono<MessageRouterSubscribeResponse> response = messageRouterSubscriberClient + .get(config.getMessageRouterSubscribeRequest()); return dmaapConsumerJsonParser.getJsonObject(response); } - @Override - public DMaaPConsumerReactiveHttpClient resolveClient() throws SSLException { - return httpClientFactory.create(config.getDmaapConsumerConfiguration()); - } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java index 7fc596c1..f63f4d76 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java @@ -20,10 +20,10 @@ package org.onap.dcaegen2.services.prh.tasks; -import javax.net.ssl.SSLException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -31,16 +31,8 @@ import reactor.core.publisher.Mono; */ public interface DmaapPublisherTask { - /** - * - * Does not work reactive version with DMaaP MR - to be investigated why in future - * As WA plesae use Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel); - * */ - @Deprecated - Mono<org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse> - execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException, SSLException; - Mono<org.apache.http.HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel); + Flux<MessageRouterPublishResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException; - DMaaPPublisherReactiveHttpClient resolveClient() throws SSLException;; + Mono<org.apache.http.HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java index 55a8bb58..3a724884 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java @@ -20,6 +20,7 @@ package org.onap.dcaegen2.services.prh.tasks; +import com.google.gson.JsonPrimitive; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; @@ -27,17 +28,14 @@ import org.apache.http.impl.client.DefaultHttpClient; import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.PnfReadyJsonBodyBuilderImpl; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory; -import org.onap.dcaegen2.services.sdk.rest.services.uri.URI; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import javax.net.ssl.SSLException; -import java.util.Optional; import java.util.function.Supplier; /** @@ -46,70 +44,52 @@ import java.util.function.Supplier; public class DmaapPublisherTaskImpl implements DmaapPublisherTask { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class); - private final Supplier<DmaapPublisherConfiguration> config; + + private final Supplier<MessageRouterPublishRequest> config; + private final MessageRouterPublisherResolver messageRouterPublisherClientResolver; private final PnfReadyJsonBodyBuilderImpl pnfReadyJsonBodyBuilder = new PnfReadyJsonBodyBuilderImpl(); - private final PublisherReactiveHttpClientFactory httpClientFactory; - public DmaapPublisherTaskImpl(final Supplier<DmaapPublisherConfiguration> config) { - this(config, new PublisherReactiveHttpClientFactory( - new DmaaPRestTemplateFactory(), - new PnfReadyJsonBodyBuilderImpl())); - } - DmaapPublisherTaskImpl( - Supplier<DmaapPublisherConfiguration> config, - PublisherReactiveHttpClientFactory httpClientFactory) { + public DmaapPublisherTaskImpl(Supplier<MessageRouterPublishRequest> config, MessageRouterPublisherResolver messageRouterPublisherClientResolver) { this.config = config; - this.httpClientFactory = httpClientFactory; + this.messageRouterPublisherClientResolver = messageRouterPublisherClientResolver; } @Override - public Mono<org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse> - execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException, SSLException { + public Flux<MessageRouterPublishResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException { if (consumerDmaapModel == null) { throw new DmaapNotFoundException("Invoked null object to DMaaP task"); } - DMaaPPublisherReactiveHttpClient dmaapPublisherReactiveHttpClient = resolveClient(); + MessageRouterPublisher messageRouterPublisher = messageRouterPublisherClientResolver.resolveClient(); LOGGER.info("Method called with arg {}", consumerDmaapModel); - return dmaapPublisherReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel, Optional.empty()); - } - - @Override - public DMaaPPublisherReactiveHttpClient resolveClient() throws SSLException { - return httpClientFactory.create(config.get()); - + String json = pnfReadyJsonBodyBuilder.createJsonBody(consumerDmaapModel); + return messageRouterPublisher.put( + config.get(), + Flux.just(json).map(JsonPrimitive::new)); } /** * * Does not work reactive version with DMaaP MR - to be investigated why in future - * As WA plesae use Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel); + * As WA please use Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel); * */ @Override public Mono<org.apache.http.HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel) { String json = pnfReadyJsonBodyBuilder.createJsonBody(consumerDmaapModel); - DefaultHttpClient httpClient = new DefaultHttpClient(); - HttpPost postRequest = new HttpPost(getUrl()); - try { - StringEntity input = new StringEntity(json); - input.setContentType(config.get().dmaapContentType()); - postRequest.setEntity(input); - HttpResponse response = httpClient.execute(postRequest); - return Mono.just(response); - } catch (Exception e) { - LOGGER.warn("Publishing to DMaaP MR failed: {}", e); - return Mono.error(e); + try (DefaultHttpClient httpClient = new DefaultHttpClient()) { + HttpPost postRequest = new HttpPost(config.get().sinkDefinition().topicUrl()); + try { + StringEntity input = new StringEntity(json); + input.setContentType(config.get().contentType()); + postRequest.setEntity(input); + HttpResponse response = httpClient.execute(postRequest); + return Mono.just(response); + } catch (Exception e) { + LOGGER.warn("Publishing to DMaaP MR failed: {}", e); + return Mono.error(e); + } } } - private String getUrl() { - return (new URI.URIBuilder()).scheme(config.get().dmaapProtocol()) - .host(config.get().dmaapHostName()) - .port(config.get().dmaapPortNumber()).path(this.createRequestPath()).build() - .toString(); - } - private String createRequestPath() { - return "/" + config.get().dmaapTopicName(); - } }
\ No newline at end of file diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterPublisherResolver.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterPublisherResolver.java new file mode 100644 index 00000000..2f4e3867 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterPublisherResolver.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.tasks; + +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; +import org.springframework.stereotype.Component; + +@Component +public class MessageRouterPublisherResolver { + + public MessageRouterPublisher resolveClient() { + return DmaapClientFactory.createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); + } +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterSubscriberResolver.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterSubscriberResolver.java new file mode 100644 index 00000000..63930ef7 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterSubscriberResolver.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.tasks; + +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; +import org.springframework.stereotype.Component; + +@Component +public class MessageRouterSubscriberResolver { + + public MessageRouterSubscriber resolveClient() { + return DmaapClientFactory.createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); + } +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java index 72ec0cac..4b3436e5 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java @@ -25,6 +25,7 @@ import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.logging.MdcVariables; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.slf4j.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; @@ -50,6 +51,7 @@ public class ScheduledTasks { private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTasks.class); private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); + private final DmaapConsumerTask dmaapConsumerTask; private final DmaapPublisherTask dmaapReadyProducerTask; private final DmaapPublisherTask dmaapUpdateProducerTask; @@ -208,7 +210,7 @@ public class ScheduledTasks { * Marked as deprecated due to problems with DMaaP MR, to be fixed in future */ @Deprecated - private Mono<org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse> + private Flux<MessageRouterPublishResponse> publishToDmaapConfiguration(final State state) { try { if (state.ActivationStatus) { @@ -217,8 +219,8 @@ public class ScheduledTasks { } return dmaapReadyProducerTask.execute(state.DmaapModel); - } catch (PrhTaskException | SSLException e) { - return Mono.error(e); + } catch (PrhTaskException e) { + return Flux.error(e); } } diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/TestAppConfiguration.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/TestAppConfiguration.java index 9dca398a..fb0b1b43 100644 --- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/TestAppConfiguration.java +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/TestAppConfiguration.java @@ -20,51 +20,38 @@ package org.onap.dcaegen2.services.prh; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource; import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.ImmutableAaiClientConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest; + +import java.time.Duration; public class TestAppConfiguration { - public static ImmutableDmaapConsumerConfiguration createDefaultDmaapConsumerConfiguration() { - return new ImmutableDmaapConsumerConfiguration.Builder() - .endpointUrl("http://dmaap-mr:2222/events/unauthenticated.VES_PNFREG_OUTPUT") + public static ImmutableMessageRouterSubscribeRequest createDefaultMessageRouterSubscribeRequest() { + return ImmutableMessageRouterSubscribeRequest.builder() .consumerGroup("OpenDCAE-c12") + .sourceDefinition(ImmutableMessageRouterSource.builder() + .name("the topic") + .topicUrl(String.format("http://%s:%d/events/TOPIC", "www", 1234)) + .build()) .consumerId("c12") - .dmaapContentType("application/json") - .dmaapHostName("message-router.onap.svc.cluster.local") - .dmaapPortNumber(3904) - .dmaapProtocol("http") - .dmaapUserName("admin") - .dmaapUserPassword("admin") - .trustStorePath("/opt/app/prh/local/org.onap.prh.trust.jks") - .trustStorePasswordPath("change_it") - .keyStorePath("/opt/app/prh/local/org.onap.prh.p12") - .keyStorePasswordPath("change_it") - .enableDmaapCertAuth(false) - .dmaapTopicName("/events/unauthenticated.SEC_OTHER_OUTPUT") - .timeoutMs(-1) - .messageLimit(-1) + .timeout(Duration.ofMillis(1)) .build(); } - public static ImmutableDmaapPublisherConfiguration createDefaultDmaapPublisherConfiguration() { - return new ImmutableDmaapPublisherConfiguration.Builder() - .endpointUrl("http://dmaap-mr:2222/events/unauthenticated.PNF_READY") - .dmaapContentType("application/json") - .dmaapHostName("message-router.onap.svc.cluster.local") - .dmaapPortNumber(3904) - .dmaapProtocol("http") - .dmaapUserName("admin") - .dmaapUserPassword("admin") - .trustStorePath("/opt/app/prh/local/org.onap.prh.trust.jks") - .trustStorePasswordPath("change_it") - .keyStorePath("/opt/app/prh/local/org.onap.prh.p12") - .keyStorePasswordPath("change_it") - .enableDmaapCertAuth(false) - .dmaapTopicName("/events/unauthenticated.PNF_READY") + public static ImmutableMessageRouterPublishRequest createDefaultMessageRouterPublishRequest() { + return ImmutableMessageRouterPublishRequest.builder() + .contentType("application/json") + .sinkDefinition(ImmutableMessageRouterSink.builder() + .name("the topic") + .topicUrl(String.format("http://%s:%d/events/TOPIC", "www", 1234)) + .build()) .build(); - } + + } public static ImmutableAaiClientConfiguration createDefaultAaiClientConfiguration() { return new ImmutableAaiClientConfiguration.Builder() diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/ConsulConfigurationParserTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/ConsulConfigurationParserTest.java index 8a2a498f..350cee68 100644 --- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/ConsulConfigurationParserTest.java +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/ConsulConfigurationParserTest.java @@ -20,21 +20,22 @@ package org.onap.dcaegen2.services.prh.configuration; -import static java.lang.ClassLoader.getSystemResource; -import static org.assertj.core.api.Assertions.assertThat; - import com.google.gson.Gson; import com.google.gson.JsonObject; -import java.nio.file.Files; -import java.nio.file.Paths; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.services.prh.TestAppConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.ImmutableAaiClientConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; + +import java.nio.file.Files; +import java.nio.file.Paths; +import java.time.Duration; + +import static java.lang.ClassLoader.getSystemResource; +import static org.assertj.core.api.Assertions.assertThat; class ConsulConfigurationParserTest { @@ -43,10 +44,8 @@ class ConsulConfigurationParserTest { new String(Files.readAllBytes(Paths.get(getSystemResource("flattened_configuration.json").toURI()))); private final ImmutableAaiClientConfiguration correctAaiClientConfig = TestAppConfiguration.createDefaultAaiClientConfiguration(); - private final ImmutableDmaapConsumerConfiguration correctDmaapConsumerConfig = - TestAppConfiguration.createDefaultDmaapConsumerConfiguration(); - private final ImmutableDmaapPublisherConfiguration correctDmaapPublisherConfig = - TestAppConfiguration.createDefaultDmaapPublisherConfiguration(); + private final ImmutableMessageRouterPublishRequest correctDmaapPublisherConfig = + TestAppConfiguration.createDefaultMessageRouterPublishRequest(); private final CbsContentParser consulConfigurationParser = new CbsContentParser( new Gson().fromJson(correctJson, JsonObject.class)); @@ -63,25 +62,25 @@ class ConsulConfigurationParserTest { assertThat(aaiClientConfig).isEqualToComparingFieldByField(correctAaiClientConfig); } - @Test - void shouldCreateDmaapConsumerConfigurationCorrectly() { - // when - DmaapConsumerConfiguration dmaapConsumerConfig = consulConfigurationParser.getDmaapConsumerConfig(); + void shouldCreateMessageRouterSubscribeRequestCorrectly() { + // given + MessageRouterSubscribeRequest messageRouterSubscribeRequest = consulConfigurationParser.getMessageRouterSubscribeRequest(); // then - assertThat(dmaapConsumerConfig).isNotNull(); - assertThat(dmaapConsumerConfig).isEqualToComparingFieldByField(correctDmaapConsumerConfig); + assertThat(messageRouterSubscribeRequest.sourceDefinition().topicUrl()).isEqualTo("http://dmaap-mr:2222/events/unauthenticated.VES_PNFREG_OUTPUT"); + assertThat(messageRouterSubscribeRequest.consumerGroup()).isEqualTo("OpenDCAE-c12"); + assertThat(messageRouterSubscribeRequest.consumerId()).isEqualTo("c12"); + assertThat(messageRouterSubscribeRequest.timeout()).isEqualTo(Duration.ofMillis(-1)); } - @Test - void shouldCreateDmaapPublisherConfigurationCorrectly() { + void shouldCreateMessageRouterPublishConfigurationCorrectly() { // when - DmaapPublisherConfiguration dmaapPublisherConfig = consulConfigurationParser.getDmaapPublisherConfig(); + MessageRouterPublishRequest messageRouterPublishRequest = consulConfigurationParser.getMessageRouterPublishRequest(); // then - assertThat(dmaapPublisherConfig).isNotNull(); - assertThat(dmaapPublisherConfig).isEqualToComparingFieldByField(correctDmaapPublisherConfig); + assertThat(messageRouterPublishRequest.contentType()).isEqualTo("application/json"); + assertThat(messageRouterPublishRequest.sinkDefinition().topicUrl()).isEqualTo("http://dmaap-mr:2222/events/unauthenticated.PNF_READY"); } }
\ No newline at end of file diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java index cdcef07c..98b73142 100644 --- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java @@ -20,21 +20,24 @@ package org.onap.dcaegen2.services.prh.service; -import static org.mockito.Mockito.spy; - import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; -import java.util.Optional; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.mockito.Mockito; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import java.util.Optional; + +import static org.mockito.Mockito.spy; + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18 */ @@ -101,6 +104,7 @@ class DmaapConsumerJsonParserTest { .build(); JsonArray messageAsJsonArray = (JsonArray) jsonParser.parse(message); + MessageRouterSubscribeResponse response = ImmutableMessageRouterSubscribeResponse.builder().items(messageAsJsonArray).build(); //when DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); @@ -109,7 +113,7 @@ class DmaapConsumerJsonParserTest { .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement); ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser - .getJsonObject(Mono.just((messageAsJsonArray))).blockFirst(); + .getJsonObject(Mono.just((response))).blockFirst(); //then Assertions.assertNotNull(consumerDmaapModel); Assertions.assertEquals(expectedObject, consumerDmaapModel); @@ -163,15 +167,15 @@ class DmaapConsumerJsonParserTest { .nfRole("gNB") .swVersion("v4.5.0.1") .build(); - JsonArray mesageAsJsonArray = (JsonArray) jsonParser.parse(message); - + JsonArray messageAsJsonArray = (JsonArray) jsonParser.parse(message); + MessageRouterSubscribeResponse response = ImmutableMessageRouterSubscribeResponse.builder().items(messageAsJsonArray).build(); //when DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); JsonElement jsonElement = new JsonParser().parse(parsed); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())) .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement); ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser - .getJsonObject(Mono.just((mesageAsJsonArray))).blockFirst(); + .getJsonObject(Mono.just((response))).blockFirst(); //then Assertions.assertNotNull(consumerDmaapModel); Assertions.assertEquals(expectedObject, consumerDmaapModel); @@ -230,6 +234,7 @@ class DmaapConsumerJsonParserTest { .build(); JsonArray messageAsJsonArray = (JsonArray) jsonParser.parse(message); + MessageRouterSubscribeResponse response = ImmutableMessageRouterSubscribeResponse.builder().items(messageAsJsonArray).build(); //when DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); @@ -238,7 +243,7 @@ class DmaapConsumerJsonParserTest { .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement); ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser - .getJsonObject(Mono.just((messageAsJsonArray))).blockFirst(); + .getJsonObject(Mono.just((response))).blockFirst(); //then Assertions.assertNotNull(consumerDmaapModel); Assertions.assertEquals(expectedObject, consumerDmaapModel); @@ -293,6 +298,7 @@ class DmaapConsumerJsonParserTest { .build(); JsonArray messageAsJsonArray = (JsonArray) jsonParser.parse(message); + MessageRouterSubscribeResponse response = ImmutableMessageRouterSubscribeResponse.builder().items(messageAsJsonArray).build(); //when DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); @@ -301,7 +307,7 @@ class DmaapConsumerJsonParserTest { .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement); ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser - .getJsonObject(Mono.just((messageAsJsonArray))).blockFirst(); + .getJsonObject(Mono.just((response))).blockFirst(); //then Assertions.assertNotNull(consumerDmaapModel); Assertions.assertEquals(expectedObject, consumerDmaapModel); @@ -349,8 +355,9 @@ class DmaapConsumerJsonParserTest { + "}}}]"; JsonArray incorrectMessageAsJsonArray = (JsonArray) jsonParser.parse(incorrectMessage); + MessageRouterSubscribeResponse response = ImmutableMessageRouterSubscribeResponse.builder().items(incorrectMessageAsJsonArray).build(); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageAsJsonArray))) + StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(response))) .expectSubscription().thenRequest(1).verifyComplete(); } @@ -394,8 +401,9 @@ class DmaapConsumerJsonParserTest { + "}}}]"; JsonArray jsonWithoutSourceNameAsJsonArray = (JsonArray) jsonParser.parse(jsonWithoutSourceName); + MessageRouterSubscribeResponse response = ImmutableMessageRouterSubscribeResponse.builder().items(jsonWithoutSourceNameAsJsonArray).build(); StepVerifier - .create(dmaapConsumerJsonParser.getJsonObject(Mono.just(jsonWithoutSourceNameAsJsonArray))) + .create(dmaapConsumerJsonParser.getJsonObject(Mono.just(response))) .expectSubscription().thenRequest(1) .verifyComplete(); @@ -444,8 +452,9 @@ class DmaapConsumerJsonParserTest { + "}}}]"; JsonArray jsonWithoutIpInformationAsJsonArray = (JsonArray) jsonParser.parse(jsonWithoutIpInformation); + MessageRouterSubscribeResponse response = ImmutableMessageRouterSubscribeResponse.builder().items(jsonWithoutIpInformationAsJsonArray).build(); - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(jsonWithoutIpInformationAsJsonArray))) + StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(response))) .expectSubscription().thenRequest(1).verifyComplete(); } @@ -485,15 +494,15 @@ class DmaapConsumerJsonParserTest { + "}}}"; JsonArray messageAsJsonArray = (JsonArray) jsonParser.parse(message); - + MessageRouterSubscribeResponse response = ImmutableMessageRouterSubscribeResponse.builder().items(messageAsJsonArray).build(); //when DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); JsonElement jsonElement = jsonParser.parse(parsed); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())) .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement); - dmaapConsumerJsonParser.getJsonObject(Mono.just((messageAsJsonArray))); - ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just((messageAsJsonArray))) + dmaapConsumerJsonParser.getJsonObject(Mono.just((response))); + ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just(response)) .blockFirst(); //then ConsumerDmaapModel expectedObject = ImmutableConsumerDmaapModel.builder() @@ -560,13 +569,14 @@ class DmaapConsumerJsonParserTest { .build(); JsonArray messageAsJsonArray = (JsonArray) jsonParser.parse(message); + MessageRouterSubscribeResponse response = ImmutableMessageRouterSubscribeResponse.builder().items(messageAsJsonArray).build(); //when DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); JsonElement jsonElement = jsonParser.parse(parsed); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())) .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement); - ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just((messageAsJsonArray))) + ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just(response)) .blockFirst(); //then @@ -625,12 +635,12 @@ class DmaapConsumerJsonParserTest { .build(); JsonArray messageAsJsonArray = (JsonArray) jsonParser.parse(message); + MessageRouterSubscribeResponse response = ImmutableMessageRouterSubscribeResponse.builder().items(messageAsJsonArray).build(); //when DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser(); //then - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageAsJsonArray))) - .expectSubscription().expectNext(expectedObject).expectNext(expectedObject).verifyComplete(); + StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(response))).expectSubscription().expectNext(expectedObject).expectNext(expectedObject).verifyComplete(); } } diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiPublisherTaskSpy.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiPublisherTaskSpy.java index 18e1a27a..04388fb7 100644 --- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiPublisherTaskSpy.java +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiPublisherTaskSpy.java @@ -20,12 +20,6 @@ package org.onap.dcaegen2.services.prh.tasks; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; - -import javax.net.ssl.SSLException; - import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration; @@ -34,6 +28,10 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; + /** @@ -49,7 +47,7 @@ public class AaiPublisherTaskSpy { */ @Bean @Primary - public AaiProducerTask registerSimpleAaiPublisherTask() throws SSLException { + public AaiProducerTask registerSimpleAaiPublisherTask() { CbsConfiguration cbsConfiguration = spy(CbsConfiguration.class); ConsumerDmaapModel consumerDmaapModel = spy(ConsumerDmaapModel.class); doReturn(mock(AaiClientConfiguration.class)).when(cbsConfiguration).getAaiClientConfiguration(); diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java deleted file mode 100644 index 9afa7671..00000000 --- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * PNF-REGISTRATION-HANDLER - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcaegen2.services.prh.tasks; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.onap.dcaegen2.services.prh.TestAppConfiguration.createDefaultDmaapConsumerConfiguration; - -import com.google.gson.JsonArray; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import java.util.Optional; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration; -import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; -import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel; -import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - - -/** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18 - */ -class DmaapConsumerTaskImplTest { - - private static ConsumerDmaapModel consumerDmaapModel; - private static DmaapConsumerTaskImpl dmaapConsumerTask; - private static DMaaPConsumerReactiveHttpClient dMaaPConsumerReactiveHttpClient; - private static DmaapConsumerConfiguration dmaapConsumerConfiguration; - private static String message; - private static String messageContentEmpty; - private static JsonArray jsonArray; - private static JsonArray jsonArrayWrongContent; - - private static CbsConfiguration cbsConfiguration; - - @BeforeAll - static void setUp() { - dmaapConsumerConfiguration = createDefaultDmaapConsumerConfiguration(); - - JsonObject jsonObject = new JsonParser().parse("{\n" - + " \"attachmentPoint\": \"bla-bla-30-3\",\n" - + " \"cvlan\": \"678\",\n" - + " \"svlan\": \"1005\"\n" - + " }").getAsJsonObject(); - - consumerDmaapModel = ImmutableConsumerDmaapModel.builder() - .ipv4("10.16.123.234") - .ipv6("0:0:0:0:0:FFFF:0A10:7BEA") - .correlationId("NOKQTFCOC540002E") - .serialNumber("QTFCOC540002E") - .equipVendor("nokia") - .equipModel("3310") - .equipType("type") - .nfRole("gNB") - .swVersion("v4.5.0.1") - .additionalFields(jsonObject) - .build(); - cbsConfiguration = mock(CbsConfiguration.class); - - message = "[{\"event\": {" - + "\"commonEventHeader\": { " - + " \"sourceName\":\"NOKQTFCOC540002E\"," - + " \"nfNamingCode\":\"gNB\" " - + "}," - + "\"pnfRegistrationFields\": {" - + " \"vendorName\": \"nokia\"," - + " \"serialNumber\": \"QTFCOC540002E\"," - + " \"pnfRegistrationFieldsVersion\": \"2.0\"," - + " \"modelNumber\": \"3310\"," - + " \"unitType\": \"type\",\n" - + " \"unitFamily\": \"BBU\"," - + " \"oamV4IpAddress\": \"10.16.123.234\"," - + " \"softwareVersion\": \"v4.5.0.1\"," - + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\"," - + " \"additionalFields\": {\"attachmentPoint\": \"bla-bla-30-3\",\"cvlan\": \"678\",\"svlan\": \"1005\"}" - + "}}}]"; - - messageContentEmpty = "[]"; - JsonParser jsonParser = new JsonParser(); - jsonArray = (JsonArray) jsonParser.parse(message); - jsonArrayWrongContent = (JsonArray) jsonParser.parse(messageContentEmpty); - - } - - @Test - void whenPassedObjectDoesNotFit_DoesNotThrowPrhTaskException() throws Exception { - //given - prepareMocksForDmaapConsumer(Optional.of(jsonArrayWrongContent)); - - //when - Flux<ConsumerDmaapModel> response = dmaapConsumerTask.execute("Sample input"); - - //then - verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty()); - assertNull(response.blockFirst()); - } - - @Test - void whenPassedObjectFits_ReturnsCorrectResponse() throws Exception { - //given - prepareMocksForDmaapConsumer(Optional.of(jsonArray)); - - //when - Flux<ConsumerDmaapModel> response = dmaapConsumerTask.execute("Sample input"); - - //then - verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty()); - assertEquals(consumerDmaapModel, response.blockFirst()); - } - - - - private void prepareMocksForDmaapConsumer(Optional<JsonArray> message) throws Exception { - dMaaPConsumerReactiveHttpClient = mock(DMaaPConsumerReactiveHttpClient.class); - when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty())) - .thenReturn(Mono.just(message.get())); - when(cbsConfiguration.getDmaapConsumerConfiguration()).thenReturn(dmaapConsumerConfiguration); - ConsumerReactiveHttpClientFactory httpClientFactory = mock(ConsumerReactiveHttpClientFactory.class); - doReturn(dMaaPConsumerReactiveHttpClient).when(httpClientFactory).create(dmaapConsumerConfiguration); - dmaapConsumerTask = new DmaapConsumerTaskImpl(cbsConfiguration, new DmaapConsumerJsonParser(), httpClientFactory); - } -}
\ No newline at end of file diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskSpy.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskSpy.java index 594575e5..4c95c717 100644 --- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskSpy.java +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskSpy.java @@ -20,19 +20,16 @@ package org.onap.dcaegen2.services.prh.tasks; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; - -import javax.net.ssl.SSLException; - import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/27/18 @@ -47,13 +44,10 @@ public class DmaapConsumerTaskSpy { */ @Bean @Primary - public DmaapConsumerTask registerSimpleDmaapConsumerTask() throws SSLException { + public DmaapConsumerTask registerSimpleDmaapConsumerTask() { CbsConfiguration cbsConfiguration = spy(CbsConfiguration.class); - doReturn(mock(DmaapConsumerConfiguration.class)).when(cbsConfiguration).getDmaapConsumerConfiguration(); + doReturn(mock(MessageRouterPublishRequest.class)).when(cbsConfiguration).getMessageRouterPublishRequest(); DmaapConsumerTaskImpl dmaapConsumerTask = spy(new DmaapConsumerTaskImpl(cbsConfiguration)); - DMaaPConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient = mock( - DMaaPConsumerReactiveHttpClient.class); - doReturn(dmaapConsumerReactiveHttpClient).when(dmaapConsumerTask).resolveClient(); return dmaapConsumerTask; } } diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapProducerTaskSpy.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapProducerTaskSpy.java index 77028a34..7a68bc8c 100644 --- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapProducerTaskSpy.java +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapProducerTaskSpy.java @@ -20,20 +20,18 @@ package org.onap.dcaegen2.services.prh.tasks; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; - -import javax.net.ssl.SSLException; import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import java.util.function.Supplier; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 */ @@ -47,14 +45,10 @@ public class DmaapProducerTaskSpy { */ @Bean @Primary - public DmaapPublisherTask registerSimpleDmaapPublisherTask() throws SSLException { - final CbsConfiguration cbsConfiguration = spy(CbsConfiguration.class); - final Supplier<DmaapPublisherConfiguration> configSupplier = () -> cbsConfiguration.getDmaapPublisherConfiguration(); - doReturn(mock(DmaapPublisherConfiguration.class)).when(cbsConfiguration).getDmaapPublisherConfiguration(); - final DmaapPublisherTaskImpl dmaapPublisherTask = spy(new DmaapPublisherTaskImpl(configSupplier)); - final DMaaPPublisherReactiveHttpClient extendedDmaapProducerHttpClient = mock( - DMaaPPublisherReactiveHttpClient.class); - doReturn(extendedDmaapProducerHttpClient).when(dmaapPublisherTask).resolveClient(); - return dmaapPublisherTask; + public DmaapPublisherTask registerSimpleDmaapPublisherTask() { + final CbsConfiguration cbsConfiguration = mock(CbsConfiguration.class); + final Supplier<MessageRouterPublishRequest> configSupplier = cbsConfiguration::getMessageRouterPublishRequest; + doReturn(mock(MessageRouterPublishRequest.class)).when(cbsConfiguration).getMessageRouterPublishRequest(); + return spy(new DmaapPublisherTaskImpl(configSupplier, new MessageRouterPublisherResolver())); } } diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImplTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImplTest.java index fb4a50ea..6347ad3d 100644 --- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImplTest.java +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImplTest.java @@ -20,136 +20,105 @@ package org.onap.dcaegen2.services.prh.tasks; -import io.netty.handler.codec.http.HttpResponseStatus; +import com.google.gson.JsonPrimitive; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.function.Executable; -import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; -import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory; -import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel; -import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; -import reactor.core.publisher.Mono; -import reactor.test.StepVerifier; - -import javax.net.ssl.SSLException; -import java.util.Optional; +import org.onap.dcaegen2.services.prh.integration.junit5.mockito.MockitoExtension; +import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import reactor.core.publisher.Flux; + import java.util.function.Supplier; -import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.Mockito.*; -import static org.onap.dcaegen2.services.prh.TestAppConfiguration.createDefaultDmaapPublisherConfiguration; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18 */ +@ExtendWith(MockitoExtension.class) class DmaapPublisherTaskImplTest { - private static ConsumerDmaapModel consumerDmaapModel; private static DmaapPublisherTaskImpl dmaapPublisherTask; - private static DMaaPPublisherReactiveHttpClient dMaaPPublisherReactiveHttpClient; - private static CbsConfiguration cbsConfiguration; - private static DmaapPublisherConfiguration dmaapPublisherConfiguration; - private Optional<RequestDiagnosticContext> requestDiagnosticContextOptionalMock; - private DmaapModel dmaapModel; - private PublisherReactiveHttpClientFactory publisherReactiveHttpClientFactory; - private Supplier<DmaapPublisherConfiguration> configSupplier; + + @Mock + private static MessageRouterPublisherResolver messageRouterPublisherClientResolver; + @Mock + private static MessageRouterPublisher messageRouterPublisher; + + private Supplier<MessageRouterPublishRequest> configSupplier; + + + @Captor + private ArgumentCaptor<Flux<JsonPrimitive>> fluxCaptor; @BeforeEach - public void beforeEach() throws SSLException { - dmaapPublisherConfiguration = createDefaultDmaapPublisherConfiguration(); - consumerDmaapModel = mock(ConsumerDmaapModel.class); - cbsConfiguration = mock(CbsConfiguration.class); - requestDiagnosticContextOptionalMock = Optional.empty(); - dmaapModel = mock(DmaapModel.class); - dMaaPPublisherReactiveHttpClient = mock(DMaaPPublisherReactiveHttpClient.class); - publisherReactiveHttpClientFactory = mock(PublisherReactiveHttpClientFactory.class); - when(cbsConfiguration.getDmaapPublisherConfiguration()).thenReturn(dmaapPublisherConfiguration); - when(publisherReactiveHttpClientFactory.create(dmaapPublisherConfiguration)) - .thenReturn(dMaaPPublisherReactiveHttpClient); - configSupplier = () -> cbsConfiguration.getDmaapPublisherConfiguration(); + void beforeEach() { + when(messageRouterPublisherClientResolver.resolveClient()).thenReturn(messageRouterPublisher); + MessageRouterPublishRequest mrRequest = createMRRequest(); + configSupplier = () -> mrRequest; } @Test void execute_whenPassedObjectDoesntFit_ThrowsPrhTaskException() { //given - dmaapPublisherTask = new DmaapPublisherTaskImpl(configSupplier); + dmaapPublisherTask = new DmaapPublisherTaskImpl(configSupplier, messageRouterPublisherClientResolver); //when Executable executableFunction = () -> dmaapPublisherTask.execute(null); //then assertThrows(PrhTaskException.class, executableFunction, "The specified parameter is incorrect"); } - @Test - void execute_whenPassedObjectFits_ReturnsCorrectStatus() throws PrhTaskException, SSLException { + void execute_whenPassedObjectFits_ReturnsCorrectStatus() throws DmaapNotFoundException { //given - HttpResponseStatus httpResponseStatus = HttpResponseStatus.OK; - HttpResponse httpClientReponse = prepareMocksForTests(httpResponseStatus); - dmaapPublisherTask = new DmaapPublisherTaskImpl(configSupplier, publisherReactiveHttpClientFactory); - + dmaapPublisherTask = new DmaapPublisherTaskImpl(configSupplier, messageRouterPublisherClientResolver); //when - StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel)).expectSubscription() - .expectNext(httpClientReponse); - + dmaapPublisherTask.execute(createConsumerDmaapModel()); //then - verify(dMaaPPublisherReactiveHttpClient, times(1)) - .getDMaaPProducerResponse(consumerDmaapModel, requestDiagnosticContextOptionalMock); - - verifyNoMoreInteractions(dMaaPPublisherReactiveHttpClient); + verify(messageRouterPublisher).put(eq(configSupplier.get()), fluxCaptor.capture()); + assertEquals(new JsonPrimitive("{\"correlationId\":\"NOKQTFCOC540002E\"}"), fluxCaptor.getValue().blockFirst()); } - @Test - void execute_whenPassedObjectFits_butIncorrectResponseReturns() throws DmaapNotFoundException, SSLException { - //given - HttpResponseStatus httpResponseStatus = HttpResponseStatus.UNAUTHORIZED; - HttpResponse httpClientReponse = prepareMocksForTests(httpResponseStatus); - dmaapPublisherTask = new DmaapPublisherTaskImpl(configSupplier, publisherReactiveHttpClientFactory); - //when - StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel)).expectSubscription() - .expectNext(httpClientReponse); - - //then - verify(dMaaPPublisherReactiveHttpClient, times(1)) - .getDMaaPProducerResponse(consumerDmaapModel, requestDiagnosticContextOptionalMock); - verifyNoMoreInteractions(dMaaPPublisherReactiveHttpClient); + private ImmutableConsumerDmaapModel createConsumerDmaapModel() { + return ImmutableConsumerDmaapModel.builder() + .ipv4("10.16.123.234") + .ipv6("0:0:0:0:0:FFFF:0A10:7BEA") + .correlationId("NOKQTFCOC540002E") + .serialNumber("QTFCOC540002E") + .equipVendor("nokia") + .equipModel("3310") + .equipType("type") + .nfRole("gNB") + .swVersion("v4.5.0.1") + .additionalFields(null) + .build(); } - @Test() - void execute_whenConsumerDmaapModelIsNull() { - //given - HttpResponseStatus httpResponseStatus = HttpResponseStatus.UNAUTHORIZED; - HttpResponse httpClientReponse = prepareMocksForTests(httpResponseStatus); - dmaapPublisherTask = new DmaapPublisherTaskImpl(configSupplier, publisherReactiveHttpClientFactory); - assertThrows(DmaapNotFoundException.class, () -> { - dmaapPublisherTask.execute(null); - }); - } + private MessageRouterPublishRequest createMRRequest() { + final MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder() + .name("the topic") + .topicUrl("http://dmaap-mr:2222/events/unauthenticated.PNF_READY") + .build(); - @Test - public void resolveClient() throws SSLException { - //given - dmaapPublisherTask = new DmaapPublisherTaskImpl(configSupplier, publisherReactiveHttpClientFactory); - //when - DMaaPPublisherReactiveHttpClient dMaaPPublisherReactiveHttpClientResolved = dmaapPublisherTask.resolveClient(); - //then - assertSame(dMaaPPublisherReactiveHttpClientResolved, dMaaPPublisherReactiveHttpClient); + return ImmutableMessageRouterPublishRequest.builder() + .sinkDefinition(sinkDefinition) + .contentType("application/json") + .build(); } - - private HttpResponse prepareMocksForTests(HttpResponseStatus httpResponseStatus) { - HttpResponse httpClientResponse = mock(HttpResponse.class); - when(httpClientResponse.statusCode()).thenReturn(httpResponseStatus.code()); - when( - dMaaPPublisherReactiveHttpClient.getDMaaPProducerResponse(dmaapModel, requestDiagnosticContextOptionalMock)) - .thenReturn(Mono.just(httpClientResponse)); - return httpClientResponse; - } - }
\ No newline at end of file |