From 95775a1da5c70e01738fb1c939f552b755af6e1d Mon Sep 17 00:00:00 2001 From: pwielebs Date: Thu, 11 Apr 2019 13:18:07 +0200 Subject: read topic_url via Data streams Change-Id: Ibabbeb3fa9b327dafbbb81e58980f8978a509d7e Issue-ID: DCAEGEN2-1360 Signed-off-by: pwielebs --- .../prh/configuration/CbsContentParser.java | 33 +++++++++--- .../services/prh/configuration/PrhAppConfig.java | 63 +--------------------- .../services/prh/tasks/AaiProducerTaskImpl.java | 7 --- .../dcaegen2/services/prh/tasks/AaiQueryTask.java | 1 + .../services/prh/tasks/BbsActionsTask.java | 1 + .../src/test/resources/correct_config.json | 3 ++ .../test/resources/flattened_configuration.json | 3 ++ 7 files changed, 36 insertions(+), 75 deletions(-) diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsContentParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsContentParser.java index e6bcda50..f19eb3e6 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsContentParser.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsContentParser.java @@ -23,8 +23,13 @@ package org.onap.dcaegen2.services.prh.configuration; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import com.google.gson.JsonObject; +import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource; import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.ImmutableAaiClientConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration; @@ -32,6 +37,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.Immutabl import java.util.Map; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamWithName; + /** * @author Przemysław Wąsala on 8/21/18 */ @@ -40,7 +47,12 @@ class CbsContentParser { private static final String SECURITY_TRUST_STORE_PASS_PATH = "security.trustStorePasswordPath"; private static final String SECURITY_KEY_STORE_PATH = "security.keyStorePath"; private static final String SECURITY_KEY_STORE_PASS_PATH = "security.keyStorePasswordPath"; + private static final String SECURITY_ENABLE_DMAAP_CERT_AUTH = "security.enableDmaapCertAuth"; private static final String CONFIG = "config"; + private static final String PNF_UPDATE = "pnf-update"; + private static final String PNF_READY = "pnf-ready"; + private static final String VES_REG_OUTPUT = "ves-reg-output"; + private final JsonObject jsonObject; CbsContentParser(JsonObject jsonObject) { @@ -48,8 +60,11 @@ class CbsContentParser { } DmaapPublisherConfiguration getDmaapPublisherConfig() { + RawDataStream sink = DataStreams.namedSinks(jsonObject).find(streamWithName(PNF_READY)).get(); + MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink); + return new ImmutableDmaapPublisherConfiguration.Builder() - .endpointUrl("http://dmaap-mr:2222/events/unauthenticated.PNF_READY") + .endpointUrl(parsedSink.topicUrl()) .dmaapTopicName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapTopicName").getAsString()) .dmaapUserPassword(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserPassword").getAsString()) .dmaapPortNumber(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapPortNumber").getAsInt()) @@ -62,13 +77,16 @@ class CbsContentParser { .trustStorePasswordPath(jsonObject.get(SECURITY_TRUST_STORE_PASS_PATH).getAsString()) .keyStorePath(jsonObject.get(SECURITY_KEY_STORE_PATH).getAsString()) .keyStorePasswordPath(jsonObject.get(SECURITY_KEY_STORE_PASS_PATH).getAsString()) - .enableDmaapCertAuth(jsonObject.get("security.enableDmaapCertAuth").getAsBoolean()) + .enableDmaapCertAuth(jsonObject.get(SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) .build(); } DmaapPublisherConfiguration getDmaapUpdatePublisherConfig() { + RawDataStream sink = DataStreams.namedSinks(jsonObject).find(streamWithName(PNF_UPDATE)).get(); + MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink); + return new ImmutableDmaapPublisherConfiguration.Builder() - .endpointUrl("http://dmaap-mr:2222/events/unauthenticated.PNF_READY") + .endpointUrl(parsedSink.topicUrl()) .dmaapTopicName(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapTopicName").getAsString()) .dmaapUserPassword(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapUserPassword").getAsString()) .dmaapPortNumber(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapPortNumber").getAsInt()) @@ -81,7 +99,7 @@ class CbsContentParser { .trustStorePasswordPath(jsonObject.get(SECURITY_TRUST_STORE_PASS_PATH).getAsString()) .keyStorePath(jsonObject.get(SECURITY_KEY_STORE_PATH).getAsString()) .keyStorePasswordPath(jsonObject.get(SECURITY_KEY_STORE_PASS_PATH).getAsString()) - .enableDmaapCertAuth(jsonObject.get("security.enableDmaapCertAuth").getAsBoolean()) + .enableDmaapCertAuth(jsonObject.get(SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) .build(); } @@ -109,8 +127,11 @@ class CbsContentParser { } DmaapConsumerConfiguration getDmaapConsumerConfig() { + RawDataStream source = DataStreams.namedSources(jsonObject).find(streamWithName(VES_REG_OUTPUT)).get(); + MessageRouterSource parsedSource = StreamFromGsonParsers.messageRouterSourceParser().unsafeParse(source); + return new ImmutableDmaapConsumerConfiguration.Builder() - .endpointUrl("http://dmaap-mr:2222/events/unauthenticated.VES_PNFREG_OUTPUT") + .endpointUrl(parsedSource.topicUrl()) .timeoutMs(jsonObject.get("dmaap.dmaapConsumerConfiguration.timeoutMs").getAsInt()) .dmaapHostName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapHostName").getAsString()) .dmaapUserName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserName").getAsString()) @@ -126,7 +147,7 @@ class CbsContentParser { .trustStorePasswordPath(jsonObject.get(SECURITY_TRUST_STORE_PASS_PATH).getAsString()) .keyStorePath(jsonObject.get(SECURITY_KEY_STORE_PATH).getAsString()) .keyStorePasswordPath(jsonObject.get(SECURITY_KEY_STORE_PASS_PATH).getAsString()) - .enableDmaapCertAuth(jsonObject.get("security.enableDmaapCertAuth").getAsBoolean()) + .enableDmaapCertAuth(jsonObject.get(SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) .build(); } } \ No newline at end of file diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java index 35895abb..4b48fa30 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java @@ -20,8 +20,6 @@ package org.onap.dcaegen2.services.prh.configuration; -import com.google.common.annotations.VisibleForTesting; -import com.google.gson.*; import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; @@ -35,13 +33,8 @@ import org.springframework.core.io.Resource; import org.springframework.util.StreamUtils; import javax.annotation.PostConstruct; -import javax.validation.constraints.NotNull; -import java.io.*; -import java.net.URI; -import java.net.URL; +import java.io.IOException; import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.util.ServiceLoader; /** * @author Przemysław Wąsala on 4/9/18 @@ -52,13 +45,6 @@ import java.util.ServiceLoader; public abstract class PrhAppConfig implements Config { private static final Logger LOGGER = LoggerFactory.getLogger(PrhAppConfig.class); - private static final String CONFIG = "configs"; - private static final String AAI = "aai"; - private static final String DMAAP = "dmaap"; - private static final String AAI_CONFIG = "aaiClientConfiguration"; - private static final String DMAAP_CONSUMER = "dmaapConsumerConfiguration"; - private static final String SECURITY = "security"; - AaiClientConfiguration aaiClientConfiguration; DmaapConsumerConfiguration dmaapConsumerConfiguration; @@ -99,51 +85,4 @@ public abstract class PrhAppConfig implements Config { public DmaapPublisherConfiguration getDmaapUpdatePublisherConfiguration() { return dmaapUpdatePublisherConfiguration; } - - - private DmaapPublisherConfiguration deserializeDmaapPublisherConfiguration( - final String dmaapProducerType, - final GsonBuilder gsonBuilder, - final JsonElement rootElement) { - return deserializeType(gsonBuilder, concatenateJsonObjects( - rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(DMAAP) - .getAsJsonObject(dmaapProducerType), - rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)), - DmaapPublisherConfiguration.class); - } - - private void deserializeDmaapConsumerConfiguration(GsonBuilder gsonBuilder, JsonElement rootElement) { - dmaapConsumerConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects( - rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(DMAAP) - .getAsJsonObject(DMAAP_CONSUMER), - rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)), - DmaapConsumerConfiguration.class); - } - - private void deserializeAaiConfiguration(GsonBuilder gsonBuilder, JsonElement rootElement) { - aaiClientConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects( - rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(AAI).getAsJsonObject(AAI_CONFIG), - rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)), - AaiClientConfiguration.class); - } - - JsonElement getJsonElement(JsonParser parser, InputStream inputStream) { - return parser.parse(new InputStreamReader(inputStream, StandardCharsets.UTF_8)); - } - - private JsonObject concatenateJsonObjects(JsonObject target, JsonObject source) { - source.entrySet() - .forEach(entry -> target.add(entry.getKey(), entry.getValue())); - return target; - } - - private T deserializeType(@NotNull GsonBuilder gsonBuilder, @NotNull JsonObject jsonObject, - @NotNull Class type) { - try { - return gsonBuilder.create().fromJson(jsonObject, type); - } catch (JsonSyntaxException e) { - LOGGER.warn("Failed to parse JSON={}", jsonObject, e); - return null; - } - } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java index 8a6fbf04..4bb5a31c 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java @@ -20,18 +20,12 @@ package org.onap.dcaegen2.services.prh.tasks; -import org.onap.dcaegen2.services.prh.configuration.Config; import org.onap.dcaegen2.services.prh.exceptions.AaiNotFoundException; import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; -import org.onap.dcaegen2.services.prh.model.AaiJsonBodyBuilderImpl; import org.onap.dcaegen2.services.prh.model.utils.HttpUtils; -import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.AaiHttpClientFactory; import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.AaiHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.patch.AaiHttpPatchClient; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; import org.onap.dcaegen2.services.sdk.rest.services.model.AaiModel; import org.slf4j.Logger; @@ -39,7 +33,6 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import reactor.core.publisher.Mono; -import reactor.netty.http.client.HttpClientResponse; /** diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTask.java index dec783ff..f1b900ac 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTask.java @@ -23,6 +23,7 @@ package org.onap.dcaegen2.services.prh.tasks; import org.onap.dcaegen2.services.sdk.rest.services.model.AaiModel; import reactor.core.publisher.Mono; +@FunctionalInterface public interface AaiQueryTask { Mono execute(final AaiModel aaiModel); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTask.java index d0b8187c..12e13140 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTask.java @@ -23,6 +23,7 @@ package org.onap.dcaegen2.services.prh.tasks; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import reactor.core.publisher.Mono; +@FunctionalInterface public interface BbsActionsTask { Mono execute(ConsumerDmaapModel consumerDmaapModel); } diff --git a/prh-app-server/src/test/resources/correct_config.json b/prh-app-server/src/test/resources/correct_config.json index ec3ebee4..7ec2f74c 100644 --- a/prh-app-server/src/test/resources/correct_config.json +++ b/prh-app-server/src/test/resources/correct_config.json @@ -53,11 +53,13 @@ "security.enableDmaapCertAuth":false, "streams_publishes":{ "pnf-update":{ + "type": "message_router", "dmaap_info":{ "topic_url":"http://dmaap-mr:2222/events/unauthenticated.PNF_UPDATE" } }, "pnf-ready":{ + "type": "message_router", "dmaap_info":{ "topic_url":"http://dmaap-mr:2222/events/unauthenticated.PNF_READY" } @@ -65,6 +67,7 @@ }, "streams_subscribes":{ "ves-reg-output":{ + "type": "message_router", "dmaap_info":{ "topic_url":"http://dmaap-mr:2222/events/unauthenticated.VES_PNFREG_OUTPUT" } diff --git a/prh-app-server/src/test/resources/flattened_configuration.json b/prh-app-server/src/test/resources/flattened_configuration.json index 7eb140bc..9997f583 100644 --- a/prh-app-server/src/test/resources/flattened_configuration.json +++ b/prh-app-server/src/test/resources/flattened_configuration.json @@ -55,11 +55,13 @@ "security.enableDmaapCertAuth":false, "streams_publishes":{ "pnf-update":{ + "type": "message_router", "dmaap_info":{ "topic_url":"http://dmaap-mr:2222/events/unauthenticated.PNF_UPDATE" } }, "pnf-ready":{ + "type": "message_router", "dmaap_info":{ "topic_url":"http://dmaap-mr:2222/events/unauthenticated.PNF_READY" } @@ -67,6 +69,7 @@ }, "streams_subscribes":{ "ves-reg-output":{ + "type": "message_router", "dmaap_info":{ "topic_url":"http://dmaap-mr:2222/events/unauthenticated.VES_PNFREG_OUTPUT" } -- cgit 1.2.3-korg