diff options
Diffstat (limited to 'datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java')
-rw-r--r-- | datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java | 93 |
1 files changed, 73 insertions, 20 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java index a86a32b8..6ace4aae 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. + * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,16 +21,32 @@ package org.onap.dcaegen2.collectors.datafile.configuration; import com.google.gson.JsonElement; import com.google.gson.JsonObject; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.Map.Entry; import java.util.Properties; -import java.util.Set; import javax.validation.constraints.NotNull; +import io.vavr.collection.Stream; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; +import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys; +import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore; +import org.onap.dcaegen2.services.sdk.security.ssl.Passwords; +import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys; /** * Parses the cloud configuration. @@ -51,6 +67,12 @@ public class CloudConfigParser { private static final String CBS_PROPERTY_SFTP_SECURITY_STRICT_HOST_KEY_CHECKING = "sftp.security.strictHostKeyChecking"; + private static final String DMAAP_CONSUMER_CONFIGURATION_CONSUMER_GROUP = "dmaap.dmaapConsumerConfiguration.consumerGroup"; + private static final String DMAAP_CONSUMER_CONFIGURATION_CONSUMER_ID = "dmaap.dmaapConsumerConfiguration.consumerId"; + private static final String DMAAP_CONSUMER_CONFIGURATION_TIMEOUT_MS = "dmaap.dmaapConsumerConfiguration.timeoutMs"; + private static final int EXPECTED_NUMBER_OF_SOURCE_TOPICS = 1; + private static final int FIRST_SOURCE_INDEX = 0; + private final Properties systemEnvironment; private final JsonObject jsonObject; @@ -78,7 +100,7 @@ public class CloudConfigParser { PublisherConfiguration cfg = ImmutablePublisherConfiguration.builder() // .publishUrl(getAsString(feedConfig, "publish_url")) // - .passWord(getAsString(feedConfig, "password")) // + .password(getAsString(feedConfig, "password")) // .userName(getAsString(feedConfig, "username")) // .trustStorePath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH)) // .trustStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) // @@ -98,24 +120,51 @@ public class CloudConfigParser { * Get the consumer configuration. * * @return the consumer configuration. - * @throws DatafileTaskException if a member of the configuration is missing. + * @throws DatafileTaskException if the configuration is invalid. */ - public @NotNull ConsumerConfiguration getDmaapConsumerConfig() throws DatafileTaskException { - JsonObject consumerCfg = jsonObject.get("streams_subscribes").getAsJsonObject(); - Set<Entry<String, JsonElement>> topics = consumerCfg.entrySet(); - if (topics.size() != 1) { - throw new DatafileTaskException("Invalid configuration, number of topic must be one, config: " + topics); + public @NotNull ConsumerConfiguration getConsumerConfiguration() throws DatafileTaskException { + try { + MessageRouterSubscriberConfig messageRouterSubscriberConfig = getMessageRouterSubscriberConfig(); + MessageRouterSubscribeRequest messageRouterSubscribeRequest = getMessageRouterSubscribeRequest(); + MessageRouterSubscriber messageRouterSubscriber = DmaapClientFactory.createMessageRouterSubscriber(messageRouterSubscriberConfig); + return new ConsumerConfiguration(messageRouterSubscriberConfig, messageRouterSubscriber, messageRouterSubscribeRequest); + } catch (Exception e) { + throw new DatafileTaskException("Could not parse message router consumer configuration", e); } - JsonObject topic = topics.iterator().next().getValue().getAsJsonObject(); - JsonObject dmaapInfo = get(topic, "dmaap_info").getAsJsonObject(); - String topicUrl = getAsString(dmaapInfo, "topic_url"); - - return ImmutableConsumerConfiguration.builder().topicUrl(topicUrl) - .trustStorePath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH)) - .trustStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) - .keyStorePath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH)) - .keyStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) - .enableDmaapCertAuth(get(jsonObject, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) // + } + + private MessageRouterSubscriberConfig getMessageRouterSubscriberConfig() throws DatafileTaskException { + return ImmutableMessageRouterSubscriberConfig.builder() + .securityKeys(isDmaapCertAuthEnabled(jsonObject) ? createSecurityKeys() : null) + .build(); + } + + private SecurityKeys createSecurityKeys() throws DatafileTaskException { + return ImmutableSecurityKeys.builder() + .keyStore(ImmutableSecurityKeysStore.of(getAsPath(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH))) + .keyStorePassword(Passwords.fromPath(getAsPath(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH))) + .trustStore(ImmutableSecurityKeysStore.of(getAsPath(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH))) + .trustStorePassword(Passwords.fromPath(getAsPath(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH))) + .build(); + } + + private boolean isDmaapCertAuthEnabled(JsonObject config) { + return config.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean(); + } + + private MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() throws DatafileTaskException { + Stream<RawDataStream<JsonObject>> sources = DataStreams.namedSources(jsonObject); + if (sources.size() != EXPECTED_NUMBER_OF_SOURCE_TOPICS) { + throw new DatafileTaskException("Invalid configuration, number of topic must be one, config: " + sources); + } + RawDataStream<JsonObject> source = sources.get(FIRST_SOURCE_INDEX); + MessageRouterSource parsedSource = StreamFromGsonParsers.messageRouterSourceParser().unsafeParse(source); + + return ImmutableMessageRouterSubscribeRequest.builder() + .consumerGroup(getAsString(jsonObject, DMAAP_CONSUMER_CONFIGURATION_CONSUMER_GROUP)) + .sourceDefinition(parsedSource) + .consumerId(getAsString(jsonObject, DMAAP_CONSUMER_CONFIGURATION_CONSUMER_ID)) + .timeout(Duration.ofMillis(get(jsonObject, DMAAP_CONSUMER_CONFIGURATION_TIMEOUT_MS).getAsLong())) .build(); } @@ -176,4 +225,8 @@ public class CloudConfigParser { return get(obj, memberName).getAsJsonObject(); } + private static @NotNull Path getAsPath(JsonObject obj, String dmaapSecurityKeyStorePath) throws DatafileTaskException { + return Paths.get(getAsString(obj, dmaapSecurityKeyStorePath)); + } + } |