aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java93
1 files changed, 73 insertions, 20 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
index a86a32b8..6ace4aae 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,16 +21,32 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Properties;
-import java.util.Set;
import javax.validation.constraints.NotNull;
+import io.vavr.collection.Stream;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
+import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys;
+import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore;
+import org.onap.dcaegen2.services.sdk.security.ssl.Passwords;
+import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys;
/**
* Parses the cloud configuration.
@@ -51,6 +67,12 @@ public class CloudConfigParser {
private static final String CBS_PROPERTY_SFTP_SECURITY_STRICT_HOST_KEY_CHECKING =
"sftp.security.strictHostKeyChecking";
+ private static final String DMAAP_CONSUMER_CONFIGURATION_CONSUMER_GROUP = "dmaap.dmaapConsumerConfiguration.consumerGroup";
+ private static final String DMAAP_CONSUMER_CONFIGURATION_CONSUMER_ID = "dmaap.dmaapConsumerConfiguration.consumerId";
+ private static final String DMAAP_CONSUMER_CONFIGURATION_TIMEOUT_MS = "dmaap.dmaapConsumerConfiguration.timeoutMs";
+ private static final int EXPECTED_NUMBER_OF_SOURCE_TOPICS = 1;
+ private static final int FIRST_SOURCE_INDEX = 0;
+
private final Properties systemEnvironment;
private final JsonObject jsonObject;
@@ -78,7 +100,7 @@ public class CloudConfigParser {
PublisherConfiguration cfg = ImmutablePublisherConfiguration.builder() //
.publishUrl(getAsString(feedConfig, "publish_url")) //
- .passWord(getAsString(feedConfig, "password")) //
+ .password(getAsString(feedConfig, "password")) //
.userName(getAsString(feedConfig, "username")) //
.trustStorePath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH)) //
.trustStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) //
@@ -98,24 +120,51 @@ public class CloudConfigParser {
* Get the consumer configuration.
*
* @return the consumer configuration.
- * @throws DatafileTaskException if a member of the configuration is missing.
+ * @throws DatafileTaskException if the configuration is invalid.
*/
- public @NotNull ConsumerConfiguration getDmaapConsumerConfig() throws DatafileTaskException {
- JsonObject consumerCfg = jsonObject.get("streams_subscribes").getAsJsonObject();
- Set<Entry<String, JsonElement>> topics = consumerCfg.entrySet();
- if (topics.size() != 1) {
- throw new DatafileTaskException("Invalid configuration, number of topic must be one, config: " + topics);
+ public @NotNull ConsumerConfiguration getConsumerConfiguration() throws DatafileTaskException {
+ try {
+ MessageRouterSubscriberConfig messageRouterSubscriberConfig = getMessageRouterSubscriberConfig();
+ MessageRouterSubscribeRequest messageRouterSubscribeRequest = getMessageRouterSubscribeRequest();
+ MessageRouterSubscriber messageRouterSubscriber = DmaapClientFactory.createMessageRouterSubscriber(messageRouterSubscriberConfig);
+ return new ConsumerConfiguration(messageRouterSubscriberConfig, messageRouterSubscriber, messageRouterSubscribeRequest);
+ } catch (Exception e) {
+ throw new DatafileTaskException("Could not parse message router consumer configuration", e);
}
- JsonObject topic = topics.iterator().next().getValue().getAsJsonObject();
- JsonObject dmaapInfo = get(topic, "dmaap_info").getAsJsonObject();
- String topicUrl = getAsString(dmaapInfo, "topic_url");
-
- return ImmutableConsumerConfiguration.builder().topicUrl(topicUrl)
- .trustStorePath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH))
- .trustStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH))
- .keyStorePath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH))
- .keyStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH))
- .enableDmaapCertAuth(get(jsonObject, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
+ }
+
+ private MessageRouterSubscriberConfig getMessageRouterSubscriberConfig() throws DatafileTaskException {
+ return ImmutableMessageRouterSubscriberConfig.builder()
+ .securityKeys(isDmaapCertAuthEnabled(jsonObject) ? createSecurityKeys() : null)
+ .build();
+ }
+
+ private SecurityKeys createSecurityKeys() throws DatafileTaskException {
+ return ImmutableSecurityKeys.builder()
+ .keyStore(ImmutableSecurityKeysStore.of(getAsPath(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH)))
+ .keyStorePassword(Passwords.fromPath(getAsPath(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH)))
+ .trustStore(ImmutableSecurityKeysStore.of(getAsPath(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH)))
+ .trustStorePassword(Passwords.fromPath(getAsPath(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)))
+ .build();
+ }
+
+ private boolean isDmaapCertAuthEnabled(JsonObject config) {
+ return config.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean();
+ }
+
+ private MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() throws DatafileTaskException {
+ Stream<RawDataStream<JsonObject>> sources = DataStreams.namedSources(jsonObject);
+ if (sources.size() != EXPECTED_NUMBER_OF_SOURCE_TOPICS) {
+ throw new DatafileTaskException("Invalid configuration, number of topic must be one, config: " + sources);
+ }
+ RawDataStream<JsonObject> source = sources.get(FIRST_SOURCE_INDEX);
+ MessageRouterSource parsedSource = StreamFromGsonParsers.messageRouterSourceParser().unsafeParse(source);
+
+ return ImmutableMessageRouterSubscribeRequest.builder()
+ .consumerGroup(getAsString(jsonObject, DMAAP_CONSUMER_CONFIGURATION_CONSUMER_GROUP))
+ .sourceDefinition(parsedSource)
+ .consumerId(getAsString(jsonObject, DMAAP_CONSUMER_CONFIGURATION_CONSUMER_ID))
+ .timeout(Duration.ofMillis(get(jsonObject, DMAAP_CONSUMER_CONFIGURATION_TIMEOUT_MS).getAsLong()))
.build();
}
@@ -176,4 +225,8 @@ public class CloudConfigParser {
return get(obj, memberName).getAsJsonObject();
}
+ private static @NotNull Path getAsPath(JsonObject obj, String dmaapSecurityKeyStorePath) throws DatafileTaskException {
+ return Paths.get(getAsString(obj, dmaapSecurityKeyStorePath));
+ }
+
}