aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main/java/org
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/main/java/org')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java211
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java141
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java110
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java105
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java5
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java74
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java24
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java3
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java8
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java15
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java46
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java20
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java9
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java47
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java13
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java52
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java58
25 files changed, 610 insertions, 357 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index a38eab8f..d324ca99 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -22,22 +22,38 @@ import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import com.google.gson.TypeAdapterFactory;
+
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Properties;
import java.util.ServiceLoader;
+
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.CloudConfigurationProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.ComponentScan;
import org.springframework.stereotype.Component;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
/**
* Holds all configuration for the DFC.
*
@@ -46,105 +62,174 @@ import org.springframework.stereotype.Component;
*/
@Component
+@ComponentScan("org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers")
@EnableConfigurationProperties
@ConfigurationProperties("app")
public class AppConfig {
-
- private static final String CONFIG = "configs";
- private static final String DMAAP = "dmaap";
- private static final String DMAAP_PRODUCER = "dmaapProducerConfiguration";
- private static final String DMAAP_CONSUMER = "dmaapConsumerConfiguration";
- private static final String FTP = "ftp";
- private static final String FTPES_CONFIGURATION = "ftpesConfiguration";
- private static final String SECURITY = "security";
private static final Logger logger = LoggerFactory.getLogger(AppConfig.class);
- private DmaapConsumerConfiguration dmaapConsumerConfiguration;
- private DmaapPublisherConfiguration dmaapPublisherConfiguration;
+ private ConsumerConfiguration dmaapConsumerConfiguration;
+ private Map<String, PublisherConfiguration> publishingConfiguration;
private FtpesConfig ftpesConfiguration;
+ private CloudConfigurationProvider cloudConfigurationProvider;
+ @Value("#{systemEnvironment}")
+ Properties systemEnvironment;
+ private Disposable refreshConfigTask = null;
@NotEmpty
private String filepath;
- public synchronized DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
+ @Autowired
+ public synchronized void setCloudConfigurationProvider(
+ CloudConfigurationProvider reactiveCloudConfigurationProvider) {
+ this.cloudConfigurationProvider = reactiveCloudConfigurationProvider;
+ }
+
+ public synchronized void setFilepath(String filepath) {
+ this.filepath = filepath;
+ }
+
+ /**
+ * Reads the cloud configuration.
+ */
+ public void initialize() {
+ stop();
+ Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
+ loadConfigurationFromFile();
+
+ refreshConfigTask = Flux.interval(Duration.ZERO, Duration.ofMinutes(5))
+ .flatMap(count -> createRefreshConfigurationTask(count, context))
+ .subscribe(e -> logger.info("Refreshed configuration data"),
+ throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
+ () -> logger.error("Configuration refresh terminated"));
+ }
+
+ public void stop() {
+ if (refreshConfigTask != null) {
+ refreshConfigTask.dispose();
+ refreshConfigTask = null;
+ }
+ }
+
+ public synchronized ConsumerConfiguration getDmaapConsumerConfiguration() {
return dmaapConsumerConfiguration;
}
- public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
- return dmaapPublisherConfiguration;
+ public synchronized boolean isFeedConfigured(String changeIdentifier) {
+ return publishingConfiguration.containsKey(changeIdentifier);
+ }
+
+ public synchronized PublisherConfiguration getPublisherConfiguration(String changeIdentifier)
+ throws DatafileTaskException {
+
+ if (publishingConfiguration == null) {
+ throw new DatafileTaskException("No PublishingConfiguration loaded, changeIdentifier: " + changeIdentifier);
+ }
+ PublisherConfiguration cfg = publishingConfiguration.get(changeIdentifier);
+ if (cfg == null) {
+ throw new DatafileTaskException(
+ "Cannot find getPublishingConfiguration for changeIdentifier: " + changeIdentifier);
+ }
+ return cfg;
}
public synchronized FtpesConfig getFtpesConfiguration() {
return ftpesConfiguration;
}
+ Flux<AppConfig> createRefreshConfigurationTask(Long counter, Map<String, String> context) {
+ return Flux.just(counter) //
+ .doOnNext(cnt -> logger.debug("Refresh config {}", cnt)) //
+ .flatMap(cnt -> readEnvironmentVariables(systemEnvironment, context)) //
+ .flatMap(this::fetchConfiguration);
+ }
+
+ Mono<EnvProperties> readEnvironmentVariables(Properties systemEnvironment, Map<String, String> context) {
+ return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context)
+ .onErrorResume(AppConfig::onErrorResume);
+ }
+
+ private static <R> Mono<R> onErrorResume(Throwable trowable) {
+ logger.error("Could not refresh application configuration {}", trowable.toString());
+ return Mono.empty();
+ }
+
+ private Mono<AppConfig> fetchConfiguration(EnvProperties env) {
+ Mono<JsonObject> serviceCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(env) //
+ .onErrorResume(AppConfig::onErrorResume);
+
+ // Note, have to use this callForServiceConfigurationReactive with EnvProperties, since the
+ // other ones does not work
+ EnvProperties dmaapEnv = ImmutableEnvProperties.builder() //
+ .consulHost(env.consulHost()) //
+ .consulPort(env.consulPort()) //
+ .cbsName(env.cbsName()) //
+ .appName(env.appName() + ":dmaap") //
+ .build(); //
+ Mono<JsonObject> dmaapCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(dmaapEnv)
+ .onErrorResume(t -> Mono.just(new JsonObject()));
+
+ return serviceCfg.zipWith(dmaapCfg, this::parseCloudConfig) //
+ .onErrorResume(AppConfig::onErrorResume);
+ }
+
+ /**
+ * parse configuration
+ *
+ * @param serviceConfigRootObject
+ * @param dmaapConfigRootObject if there is no dmaapConfigRootObject, the dmaap feeds are taken
+ * from the serviceConfigRootObject
+ * @return this which is updated if successful
+ */
+ private AppConfig parseCloudConfig(JsonObject serviceConfigRootObject, JsonObject dmaapConfigRootObject) {
+ try {
+ CloudConfigParser parser = new CloudConfigParser(serviceConfigRootObject, dmaapConfigRootObject);
+ setConfiguration(parser.getDmaapConsumerConfig(), parser.getDmaapPublisherConfig(),
+ parser.getFtpesConfig());
+ } catch (DatafileTaskException e) {
+ logger.error("Could not parse configuration {}", e.toString(), e);
+ }
+ return this;
+ }
+
/**
* Reads the configuration from file.
*/
- public void loadConfigurationFromFile() {
+ void loadConfigurationFromFile() {
GsonBuilder gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
- JsonParser parser = new JsonParser();
- JsonObject jsonObject;
+
try (InputStream inputStream = createInputStream(filepath)) {
- JsonElement rootElement = getJsonElement(parser, inputStream);
- if (rootElement.isJsonObject()) {
- jsonObject = rootElement.getAsJsonObject();
- FtpesConfig ftpesConfig = deserializeType(gsonBuilder,
- jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(FTP).getAsJsonObject(FTPES_CONFIGURATION),
- FtpesConfig.class);
- DmaapConsumerConfiguration consumerConfiguration = deserializeType(gsonBuilder,
- concatenateJsonObjects(
- jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP)
- .getAsJsonObject(DMAAP_CONSUMER),
- rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
- DmaapConsumerConfiguration.class);
-
- DmaapPublisherConfiguration publisherConfiguration = deserializeType(gsonBuilder,
- concatenateJsonObjects(
- jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP)
- .getAsJsonObject(DMAAP_PRODUCER),
- rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
- DmaapPublisherConfiguration.class);
-
- setConfiguration(consumerConfiguration, publisherConfiguration, ftpesConfig);
+ JsonParser parser = new JsonParser();
+ JsonObject rootObject = getJsonElement(parser, inputStream).getAsJsonObject();
+ if (rootObject == null) {
+ throw new JsonSyntaxException("Root is not a json object");
}
+ parseCloudConfig(rootObject, rootObject);
} catch (JsonSyntaxException | IOException e) {
- logger.error("Problem with loading configuration, file: {}", filepath, e);
+ logger.warn("Local configuration file not loaded: {}", filepath, e);
}
}
- synchronized void setConfiguration(DmaapConsumerConfiguration consumerConfiguration,
- DmaapPublisherConfiguration publisherConfiguration, FtpesConfig ftpesConfig) {
- this.dmaapConsumerConfiguration = consumerConfiguration;
- this.dmaapPublisherConfiguration = publisherConfiguration;
- this.ftpesConfiguration = ftpesConfig;
+ private synchronized void setConfiguration(ConsumerConfiguration consumerConfiguration,
+ Map<String, PublisherConfiguration> publisherConfiguration, FtpesConfig ftpesConfig) {
+ if (consumerConfiguration == null || publisherConfiguration == null || ftpesConfig == null) {
+ logger.error(
+ "Problem with configuration consumerConfiguration: {}, publisherConfiguration: {}, ftpesConfig: {}",
+ consumerConfiguration, publisherConfiguration, ftpesConfig);
+ } else {
+ this.dmaapConsumerConfiguration = consumerConfiguration;
+ this.publishingConfiguration = publisherConfiguration;
+ this.ftpesConfiguration = ftpesConfig;
+ }
}
JsonElement getJsonElement(JsonParser parser, InputStream inputStream) {
return parser.parse(new InputStreamReader(inputStream));
}
- private <T> T deserializeType(@NotNull GsonBuilder gsonBuilder, @NotNull JsonObject jsonObject,
- @NotNull Class<T> type) {
- return gsonBuilder.create().fromJson(jsonObject, type);
- }
-
InputStream createInputStream(@NotNull String filepath) throws IOException {
return new BufferedInputStream(new FileInputStream(filepath));
}
- synchronized String getFilepath() {
- return this.filepath;
- }
-
- public synchronized void setFilepath(String filepath) {
- this.filepath = filepath;
- }
-
- private JsonObject concatenateJsonObjects(JsonObject target, JsonObject source) {
- source.entrySet().forEach(entry -> target.add(entry.getKey(), entry.getValue()));
- return target;
- }
-
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
index 6b7860c4..3ac6b2c6 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
@@ -18,11 +18,17 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
/**
@@ -32,63 +38,106 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.Immutabl
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
public class CloudConfigParser {
-
private static final String DMAAP_SECURITY_TRUST_STORE_PATH = "dmaap.security.trustStorePath";
private static final String DMAAP_SECURITY_TRUST_STORE_PASS_PATH = "dmaap.security.trustStorePasswordPath";
private static final String DMAAP_SECURITY_KEY_STORE_PATH = "dmaap.security.keyStorePath";
private static final String DMAAP_SECURITY_KEY_STORE_PASS_PATH = "dmaap.security.keyStorePasswordPath";
private static final String DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH = "dmaap.security.enableDmaapCertAuth";
- private final JsonObject jsonObject;
+ private final JsonObject serviceConfigurationRoot;
+ private final JsonObject dmaapConfigurationRoot;
- CloudConfigParser(JsonObject jsonObject) {
- this.jsonObject = jsonObject;
+ public CloudConfigParser(JsonObject serviceConfigurationRoot, JsonObject dmaapConfigurationRoot) {
+ this.serviceConfigurationRoot = serviceConfigurationRoot;
+ this.dmaapConfigurationRoot = dmaapConfigurationRoot;
}
- DmaapPublisherConfiguration getDmaapPublisherConfig() {
- return new ImmutableDmaapPublisherConfiguration.Builder()
- .dmaapTopicName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapTopicName").getAsString())
- .dmaapUserPassword(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserPassword").getAsString())
- .dmaapPortNumber(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapPortNumber").getAsInt())
- .dmaapProtocol(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapProtocol").getAsString())
- .dmaapContentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString())
- .dmaapHostName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapHostName").getAsString())
- .dmaapUserName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserName").getAsString())
- .trustStorePath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PATH).getAsString())
- .trustStorePasswordPath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PASS_PATH).getAsString())
- .keyStorePath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PATH).getAsString())
- .keyStorePasswordPath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PASS_PATH).getAsString())
- .enableDmaapCertAuth(jsonObject.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
- .build();
+ public Map<String, PublisherConfiguration> getDmaapPublisherConfig() throws DatafileTaskException {
+ Iterator<JsonElement> producerCfgs =
+ toArray(serviceConfigurationRoot.get("dmaap.dmaapProducerConfiguration")).iterator();
+
+ Map<String, PublisherConfiguration> result = new HashMap<>();
+
+ while (producerCfgs.hasNext()) {
+ JsonObject producerCfg = producerCfgs.next().getAsJsonObject();
+ String feedName = getAsString(producerCfg, "feedName");
+ JsonObject feedConfig = getFeedConfig(feedName);
+
+ PublisherConfiguration cfg = ImmutablePublisherConfiguration.builder() //
+ .publishUrl(getAsString(feedConfig, "publish_url")) //
+ .passWord(getAsString(feedConfig, "password")) //
+ .userName(getAsString(feedConfig, "username")) //
+ .trustStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PATH)) //
+ .trustStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) //
+ .keyStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PATH)) //
+ .keyStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) //
+ .enableDmaapCertAuth(
+ get(serviceConfigurationRoot, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
+ .changeIdentifier(getAsString(producerCfg, "changeIdentifier")) //
+ .logUrl(getAsString(feedConfig, "log_url")) //
+ .build();
+
+ result.put(cfg.changeIdentifier(), cfg);
+ }
+ return result;
}
- DmaapConsumerConfiguration getDmaapConsumerConfig() {
- return new ImmutableDmaapConsumerConfiguration.Builder()
- .timeoutMs(jsonObject.get("dmaap.dmaapConsumerConfiguration.timeoutMs").getAsInt())
- .dmaapHostName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapHostName").getAsString())
- .dmaapUserName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserName").getAsString())
- .dmaapUserPassword(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserPassword").getAsString())
- .dmaapTopicName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapTopicName").getAsString())
- .dmaapPortNumber(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapPortNumber").getAsInt())
- .dmaapContentType(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapContentType").getAsString())
- .messageLimit(jsonObject.get("dmaap.dmaapConsumerConfiguration.messageLimit").getAsInt())
- .dmaapProtocol(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapProtocol").getAsString())
- .consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString())
- .consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString())
- .trustStorePath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PATH).getAsString())
- .trustStorePasswordPath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PASS_PATH).getAsString())
- .keyStorePath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PATH).getAsString())
- .keyStorePasswordPath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PASS_PATH).getAsString())
- .enableDmaapCertAuth(jsonObject.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
+ public ConsumerConfiguration getDmaapConsumerConfig() throws DatafileTaskException {
+ JsonObject consumerCfg = serviceConfigurationRoot.get("streams_subscribes").getAsJsonObject();
+ Set<Entry<String, JsonElement>> topics = consumerCfg.entrySet();
+ if (topics.size() != 1) {
+ throw new DatafileTaskException("Invalid configuration, number oftopic must be one, config: " + topics);
+ }
+ JsonObject topic = topics.iterator().next().getValue().getAsJsonObject();
+ JsonObject dmaapInfo = get(topic, "dmmap_info").getAsJsonObject();
+ String topicUrl = getAsString(dmaapInfo, "topic_url");
+
+ return ImmutableConsumerConfiguration.builder().topicUrl(topicUrl)
+ .trustStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PATH))
+ .trustStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PASS_PATH))
+ .keyStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PATH))
+ .keyStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PASS_PATH))
+ .enableDmaapCertAuth(
+ get(serviceConfigurationRoot, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
.build();
}
- FtpesConfig getFtpesConfig() {
+ public FtpesConfig getFtpesConfig() throws DatafileTaskException {
return new ImmutableFtpesConfig.Builder() //
- .keyCert(jsonObject.get("dmaap.ftpesConfig.keyCert").getAsString())
- .keyPassword(jsonObject.get("dmaap.ftpesConfig.keyPassword").getAsString())
- .trustedCa(jsonObject.get("dmaap.ftpesConfig.trustedCa").getAsString())
- .trustedCaPassword(jsonObject.get("dmaap.ftpesConfig.trustedCaPassword").getAsString()) //
+ .keyCert(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyCert"))
+ .keyPassword(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyPassword"))
+ .trustedCa(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.trustedCa"))
+ .trustedCaPassword(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.trustedCaPassword")) //
.build();
}
+
+ private static JsonElement get(JsonObject obj, String memberName) throws DatafileTaskException {
+ JsonElement elem = obj.get(memberName);
+ if (elem == null) {
+ throw new DatafileTaskException("Could not find member: " + memberName + " in: " + obj);
+ }
+ return elem;
+ }
+
+ private static String getAsString(JsonObject obj, String memberName) throws DatafileTaskException {
+ return get(obj, memberName).getAsString();
+ }
+
+ private JsonObject getFeedConfig(String feedName) throws DatafileTaskException {
+ JsonElement elem = dmaapConfigurationRoot.get(feedName);
+ if (elem == null) {
+ elem = get(serviceConfigurationRoot, feedName); // Fallback, try to find it under
+ // serviceConfigurationRoot
+ }
+ return elem.getAsJsonObject();
+ }
+
+ private static JsonArray toArray(JsonElement obj) {
+ if (obj.isJsonArray()) {
+ return obj.getAsJsonArray();
+ }
+ JsonArray arr = new JsonArray();
+ arr.add(obj);
+ return arr;
+ }
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
deleted file mode 100644
index 597f525f..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*-
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.configuration;
-
-import com.google.gson.JsonObject;
-import java.util.Map;
-import java.util.Properties;
-import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.ReactiveCloudConfigurationProvider;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.Primary;
-import org.springframework.scheduling.annotation.EnableScheduling;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
-
-/**
- * Gets the DFC configuration from the ConfigBindingService/Consul and parses it to the configurations needed in DFC.
- *
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-@Configuration
-@ComponentScan("org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers")
-@EnableConfigurationProperties
-@EnableScheduling
-@Primary
-public class CloudConfiguration extends AppConfig {
- private static final Logger logger = LoggerFactory.getLogger(CloudConfiguration.class);
- private ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider;
- private DmaapPublisherConfiguration dmaapPublisherCloudConfiguration;
- private DmaapConsumerConfiguration dmaapConsumerCloudConfiguration;
- private FtpesConfig ftpesCloudConfiguration;
-
- @Value("#{systemEnvironment}")
- private Properties systemEnvironment;
-
- @Autowired
- public synchronized void setThreadPoolTaskScheduler(
- ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider) {
- this.reactiveCloudConfigurationProvider = reactiveCloudConfigurationProvider;
- }
-
- /**
- * Reads the cloud configuration.
- */
- public void runTask() {
- Map<String,String> context = MappedDiagnosticContext.initializeTraceContext();
- EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context) //
- .subscribeOn(Schedulers.parallel()) //
- .flatMap(reactiveCloudConfigurationProvider::callForServiceConfigurationReactive) //
- .flatMap(this::parseCloudConfig) //
- .subscribe(null, this::onError, this::onComplete);
- }
-
- private void onComplete() {
- logger.trace("Configuration updated");
- }
-
- private void onError(Throwable throwable) {
- logger.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable);
- }
-
- private synchronized Mono<CloudConfiguration> parseCloudConfig(JsonObject jsonObject) {
- logger.info("Received application configuration: {}", jsonObject);
- CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject);
- dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig();
- dmaapConsumerCloudConfiguration = cloudConfigParser.getDmaapConsumerConfig();
- ftpesCloudConfiguration = cloudConfigParser.getFtpesConfig();
- return Mono.just(this);
- }
-
- @Override
- public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
- return dmaapPublisherCloudConfiguration != null ? dmaapPublisherCloudConfiguration
- : super.getDmaapPublisherConfiguration();
- }
-
- @Override
- public synchronized DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
- return dmaapConsumerCloudConfiguration != null ? dmaapConsumerCloudConfiguration
- : super.getDmaapConsumerConfiguration();
- }
-
- @Override
- public synchronized FtpesConfig getFtpesConfiguration() {
- return ftpesCloudConfiguration != null ? ftpesCloudConfiguration : super.getFtpesConfiguration();
- }
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java
new file mode 100644
index 00000000..fc9ab204
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java
@@ -0,0 +1,105 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018,2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.configuration;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
+
+@Value.Immutable
+@Value.Style(redactedMask = "####")
+@Gson.TypeAdapters
+public abstract class ConsumerConfiguration {
+ @Value.Redacted
+ public abstract String topicUrl();
+
+ public abstract String trustStorePath();
+
+ public abstract String trustStorePasswordPath();
+
+ public abstract String keyStorePath();
+
+ public abstract String keyStorePasswordPath();
+
+ public abstract Boolean enableDmaapCertAuth();
+
+ public DmaapConsumerConfiguration toDmaap() throws DatafileTaskException {
+ try {
+ URL url = new URL(topicUrl());
+ String passwd = "";
+ String userName = "";
+ if (url.getUserInfo() != null) {
+ String[] userInfo = url.getUserInfo().split(":");
+ userName = userInfo[0];
+ passwd = userInfo[1];
+ }
+ String urlPath = url.getPath();
+ DmaapConsumerUrlPath path = parseDmaapUrlPath(urlPath);
+
+ return new ImmutableDmaapConsumerConfiguration.Builder() //
+ .dmaapContentType("application/json") //
+ .dmaapPortNumber(url.getPort()) //
+ .dmaapHostName(url.getHost()) //
+ .dmaapTopicName(path.dmaapTopicName) //
+ .dmaapProtocol(url.getProtocol()) //
+ .dmaapUserName(userName) //
+ .dmaapUserPassword(passwd) //
+ .trustStorePath(this.trustStorePath()) //
+ .trustStorePasswordPath(this.trustStorePasswordPath()) //
+ .keyStorePath(this.keyStorePath()) //
+ .keyStorePasswordPath(this.keyStorePasswordPath()) //
+ .enableDmaapCertAuth(this.enableDmaapCertAuth()) //
+ .consumerId(path.consumerId) //
+ .consumerGroup(path.consumerGroup) //
+ .timeoutMs(-1) //
+ .messageLimit(-1) //
+ .build();
+ } catch (MalformedURLException e) {
+ throw new DatafileTaskException("Could not parse the URL", e);
+ }
+ }
+
+ private class DmaapConsumerUrlPath {
+ final String dmaapTopicName;
+ final String consumerGroup;
+ final String consumerId;
+
+ DmaapConsumerUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) {
+ this.dmaapTopicName = dmaapTopicName;
+ this.consumerGroup = consumerGroup;
+ this.consumerId = consumerId;
+ }
+ }
+
+ private DmaapConsumerUrlPath parseDmaapUrlPath(String urlPath) throws DatafileTaskException {
+ String[] tokens = urlPath.split("/"); // UrlPath: /events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12
+ if (tokens.length != 5) {
+ throw new DatafileTaskException("The path has incorrect syntax: " + urlPath);
+ }
+
+ final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // ex. // /events/unauthenticated.VES_NOTIFICATION_OUTPUT
+ final String consumerGroup = tokens[3]; // ex. OpenDcae-c12
+ final String consumerId = tokens[4]; // ex. C12
+ return new DmaapConsumerUrlPath(dmaapTopicName, consumerGroup, consumerId);
+ }
+
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
index 71003f80..62af92a8 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
@@ -19,12 +19,14 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+
import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
+
import reactor.core.publisher.Mono;
/**
@@ -53,7 +55,7 @@ class EnvironmentProcessor {
} catch (EnvironmentLoaderException e) {
return Mono.error(e);
}
- logger.info("Evaluated environment system variables {}", envProperties);
+ logger.trace("Evaluated environment system variables {}", envProperties);
return Mono.just(envProperties);
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java
index 3f029359..844699eb 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java
@@ -21,6 +21,7 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
import java.io.Serializable;
+
import org.immutables.gson.Gson;
import org.immutables.value.Value;
import org.springframework.stereotype.Component;
@@ -28,7 +29,7 @@ import org.springframework.stereotype.Component;
@Component
@Value.Immutable
-@Value.Style(builder = "new")
+@Value.Style(builder = "new", redactedMask = "####")
@Gson.TypeAdapters
public abstract class FtpesConfig implements Serializable {
@@ -38,11 +39,13 @@ public abstract class FtpesConfig implements Serializable {
public abstract String keyCert();
@Value.Parameter
+ @Value.Redacted
public abstract String keyPassword();
@Value.Parameter
public abstract String trustedCa();
@Value.Parameter
+ @Value.Redacted
public abstract String trustedCaPassword();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java
new file mode 100644
index 00000000..5576ed26
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java
@@ -0,0 +1,74 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018,2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.configuration;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+
+
+@Value.Immutable
+@Value.Style(redactedMask = "####")
+@Gson.TypeAdapters
+public interface PublisherConfiguration {
+
+ String publishUrl();
+
+ String logUrl();
+
+ String userName();
+
+ @Value.Redacted
+ String passWord();
+
+ String trustStorePath();
+
+ String trustStorePasswordPath();
+
+ String keyStorePath();
+
+ String keyStorePasswordPath();
+
+ Boolean enableDmaapCertAuth();
+
+ String changeIdentifier();
+
+ default DmaapPublisherConfiguration toDmaap() throws MalformedURLException {
+ URL url = new URL(publishUrl());
+ String urlPath = url.getPath();
+
+ return new ImmutableDmaapPublisherConfiguration.Builder() //
+ .dmaapContentType("application/octet-stream") //
+ .dmaapPortNumber(url.getPort()) //
+ .dmaapHostName(url.getHost()) //
+ .dmaapTopicName(urlPath) //
+ .dmaapProtocol(url.getProtocol()) //
+ .dmaapUserName(this.userName()) //
+ .dmaapUserPassword(this.passWord()) //
+ .trustStorePath(this.trustStorePath()) //
+ .trustStorePasswordPath(this.trustStorePasswordPath()) //
+ .keyStorePath(this.keyStorePath()) //
+ .keyStorePasswordPath(this.keyStorePasswordPath()) //
+ .enableDmaapCertAuth(this.enableDmaapCertAuth()) //
+ .build();
+ }
+
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
index b78e4ae5..5835de1a 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
@@ -16,7 +16,6 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
-import io.swagger.annotations.ApiOperation;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
@@ -24,7 +23,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
+
import javax.annotation.PostConstruct;
+
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
import org.slf4j.Logger;
@@ -36,6 +37,8 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
+
+import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
/**
@@ -49,7 +52,6 @@ import reactor.core.publisher.Mono;
public class SchedulerConfig {
private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = Duration.ofSeconds(15);
- private static final Duration SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = Duration.ofMinutes(5);
private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE = Duration.ofHours(1);
private static final Logger logger = LoggerFactory.getLogger(SchedulerConfig.class);
private static List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
@@ -57,21 +59,21 @@ public class SchedulerConfig {
private final TaskScheduler taskScheduler;
private final ScheduledTasks scheduledTask;
- private final CloudConfiguration cloudConfiguration;
+ private final AppConfig configuration;
/**
* Constructor.
*
* @param taskScheduler The scheduler used to schedule the tasks.
* @param scheduledTasks The scheduler that will actually handle the tasks.
- * @param cloudConfiguration The DFC configuration.
+ * @param configuration The DFC configuration.
*/
@Autowired
public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTasks,
- CloudConfiguration cloudConfiguration) {
+ AppConfig configuration) {
this.taskScheduler = taskScheduler;
this.scheduledTask = scheduledTasks;
- this.cloudConfiguration = cloudConfiguration;
+ this.configuration = configuration;
}
/**
@@ -83,6 +85,7 @@ public class SchedulerConfig {
public synchronized Mono<ResponseEntity<String>> getResponseFromCancellationOfTasks() {
scheduledFutureList.forEach(x -> x.cancel(false));
scheduledFutureList.clear();
+ configuration.stop();
MDC.setContextMap(contextMap);
logger.info("Stopped Datafile workflow");
MDC.clear();
@@ -99,12 +102,11 @@ public class SchedulerConfig {
public synchronized boolean tryToStartTask() {
contextMap = MappedDiagnosticContext.initializeTraceContext();
logger.info("Start scheduling Datafile workflow");
+ configuration.initialize();
+
if (scheduledFutureList.isEmpty()) {
- scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(cloudConfiguration::runTask,
- Instant.now(), SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY));
- scheduledFutureList.add(
- taskScheduler.scheduleWithFixedDelay(scheduledTask::executeDatafileMainTask,
- SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
+ scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(scheduledTask::executeDatafileMainTask,
+ SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
scheduledFutureList
.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()),
SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE));
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java
index 71242265..5a78261a 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java
@@ -62,7 +62,7 @@ public class SwaggerConfig extends WebMvcConfigurationSupport {
.build();
}
- private ApiInfo apiInfo() {
+ private static ApiInfo apiInfo() {
return new ApiInfoBuilder() //
.title(API_TITLE) //
.description(DESCRIPTION) //
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java
index cbd67297..847ca46e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java
@@ -41,7 +41,7 @@ public class TomcatHttpConfig {
return tomcat;
}
- private Connector getHttpConnector() {
+ private static Connector getHttpConnector() {
Connector connector = new Connector(TomcatServletWebServerFactory.DEFAULT_PROTOCOL);
connector.setScheme("http");
connector.setPort(8100);
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
index 4716fa87..791f0cf1 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
@@ -71,7 +71,7 @@ public class ScheduleController {
public Mono<ResponseEntity<String>> startTasks() {
return Mono.fromSupplier(schedulerConfig::tryToStartTask) //
- .map(this::createStartTaskResponse);
+ .map(ScheduleController::createStartTaskResponse);
}
/**
@@ -90,7 +90,7 @@ public class ScheduleController {
}
@ApiOperation(value = "Sends success or error response on starting task execution")
- private ResponseEntity<String> createStartTaskResponse(boolean wasScheduled) {
+ private static ResponseEntity<String> createStartTaskResponse(boolean wasScheduled) {
if (wasScheduled) {
return new ResponseEntity<>("Datafile Service has been started!", HttpStatus.CREATED);
} else {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
index 4c49dd8a..72623db2 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
@@ -17,6 +17,7 @@
package org.onap.dcaegen2.collectors.datafile.ftp;
import java.util.Optional;
+
import org.immutables.value.Value;
/**
@@ -26,11 +27,13 @@ import org.immutables.value.Value;
*
*/
@Value.Immutable
+@Value.Style(redactedMask = "####")
public interface FileServerData {
public String serverAddress();
public String userId();
+ @Value.Redacted
public String password();
public Optional<Integer> port();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
index c78ae3a3..bb3016ce 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
@@ -128,7 +128,7 @@ public class FtpsClient implements FileCollectClient {
logger.trace("collectFile fetched: {}", localFileName);
}
- private int getPort(Optional<Integer> port) {
+ private static int getPort(Optional<Integer> port) {
return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT;
}
@@ -180,7 +180,7 @@ public class FtpsClient implements FileCollectClient {
logger.warn("Local file {} already created", localFileName);
}
OutputStream output = new FileOutputStream(localFile);
- logger.debug("File {} opened xNF", localFileName);
+ logger.trace("File {} opened xNF", localFileName);
return output;
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
index 333be92a..bdaf6d4c 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
@@ -55,7 +55,7 @@ public class SftpClient implements FileCollectClient {
try {
sftpChannel.get(remoteFile, localFile.toString());
- logger.debug("File {} Download Successfull from xNF", localFile.getFileName());
+ logger.trace("File {} Download Successfull from xNF", localFile.getFileName());
} catch (SftpException e) {
boolean retry = e.id != ChannelSftp.SSH_FX_NO_SUCH_FILE && e.id != ChannelSftp.SSH_FX_PERMISSION_DENIED && e.id != ChannelSftp.SSH_FX_OP_UNSUPPORTED;
throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e, retry);
@@ -90,11 +90,11 @@ public class SftpClient implements FileCollectClient {
}
}
- private int getPort(Optional<Integer> port) {
+ private static int getPort(Optional<Integer> port) {
return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT;
}
- private Session setUpSession(FileServerData fileServerData) throws JSchException {
+ private static Session setUpSession(FileServerData fileServerData) throws JSchException {
JSch jsch = new JSch();
Session newSession =
@@ -105,7 +105,7 @@ public class SftpClient implements FileCollectClient {
return newSession;
}
- private ChannelSftp getChannel(Session session) throws JSchException {
+ private static ChannelSftp getChannel(Session session) throws JSchException {
Channel channel = session.openChannel("sftp");
channel.connect();
return (ChannelSftp) channel;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
index 0a6b669c..36aae949 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
@@ -38,6 +38,7 @@ import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
*/
@Value.Immutable
@Gson.TypeAdapters
+@Value.Style(redactedMask = "####")
public abstract class FileData {
public static final String DATAFILE_TMPDIR = "/tmp/onap_datafile/";
@@ -53,6 +54,7 @@ public abstract class FileData {
*
* @return the URL to use to fetch the file from the PNF
*/
+ @Value.Redacted
public abstract String location();
/**
@@ -123,7 +125,7 @@ public abstract class FileData {
* @return An <code>Optional</code> containing a String array with the user name and password if given, or an empty
* <code>Optional</code> if not given.
*/
- private Optional<String[]> getUserNameAndPasswordIfGiven(String userInfoString) {
+ private static Optional<String[]> getUserNameAndPasswordIfGiven(String userInfoString) {
if (userInfoString != null) {
String[] userAndPassword = userInfoString.split(":");
if (userAndPassword.length == 2) {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java
index 63ed0daa..5b8c015e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java
@@ -33,6 +33,7 @@ import org.immutables.value.Value;
@Value.Immutable
@Gson.TypeAdapters
+@Value.Style(redactedMask = "####")
public interface FilePublishInformation {
@SerializedName("productName")
@@ -54,6 +55,7 @@ public interface FilePublishInformation {
String getTimeZoneOffset();
@SerializedName("location")
+ @Value.Redacted
String getLocation();
@SerializedName("compression")
@@ -70,4 +72,6 @@ public interface FilePublishInformation {
String getName();
Map<String, String> getContext();
+
+ String getChangeIdentifier();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java
index 7081d1ac..b8df125b 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java
@@ -31,12 +31,12 @@ import java.util.Set;
*/
public abstract class JsonSerializer {
+ private JsonSerializer() {}
- private static Gson gson =
- new GsonBuilder() //
- .serializeNulls() //
- .addSerializationExclusionStrategy(new FilePublishInformationExclusionStrategy()) //
- .create(); //
+ private static Gson gson = new GsonBuilder() //
+ .serializeNulls() //
+ .addSerializationExclusionStrategy(new FilePublishInformationExclusionStrategy()) //
+ .create(); //
/**
* Serializes a <code>filePublishInformation</code>.
@@ -56,9 +56,10 @@ public abstract class JsonSerializer {
private final Set<String> inclusions =
Sets.newHashSet("productName", "vendorName", "lastEpochMicrosec", "sourceName", "startEpochMicrosec",
"timeZoneOffset", "location", "compression", "fileFormatType", "fileFormatVersion");
+
@Override
- public boolean shouldSkipField(FieldAttributes f) {
- return !inclusions.contains(f.getName());
+ public boolean shouldSkipField(FieldAttributes fieldAttributes) {
+ return !inclusions.contains(fieldAttributes.getName());
}
@Override
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index 3a3eb3aa..470c4e73 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -22,11 +22,13 @@ import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
+
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.StreamSupport;
+
import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
@@ -37,6 +39,7 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -70,7 +73,6 @@ public class JsonMessageParser {
private static final String FILE_FORMAT_VERSION = "fileFormatVersion";
private static final String FILE_READY_CHANGE_TYPE = "FileReady";
- private static final String FILE_READY_CHANGE_IDENTIFIER = "PM_MEAS_FILES";
/**
* The data types available in the event name.
@@ -92,7 +94,7 @@ public class JsonMessageParser {
* @return a <code>Flux</code> containing messages.
*/
public Flux<FileReadyMessage> getMessagesFromJson(Mono<String> rawMessage) {
- return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createMessageData);
+ return rawMessage.flatMapMany(JsonMessageParser::getJsonParserMessage).flatMap(this::createMessageData);
}
Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
@@ -123,18 +125,17 @@ public class JsonMessageParser {
: getMessagesFromJsonArray(jsonElement);
}
- private Mono<JsonElement> getJsonParserMessage(String message) {
- logger.trace("original message from message router: {}", message);
+ private static Mono<JsonElement> getJsonParserMessage(String message) {
return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
}
- private Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) {
+ private static Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) {
return jsonObject.flatMap(monoJsonP -> containsNotificationFields(monoJsonP) ? transformMessages(monoJsonP)
: logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject));
}
- private Mono<FileReadyMessage> transformMessages(JsonObject message) {
+ private static Mono<FileReadyMessage> transformMessages(JsonObject message) {
Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message);
if (optionalMessageMetaData.isPresent()) {
MessageMetaData messageMetaData = optionalMessageMetaData.get();
@@ -159,7 +160,7 @@ public class JsonMessageParser {
}
- private Optional<MessageMetaData> getMessageMetaData(JsonObject message) {
+ private static Optional<MessageMetaData> getMessageMetaData(JsonObject message) {
List<String> missingValues = new ArrayList<>();
JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER);
String eventName = getValueFromJson(commonEventHeader, EVENT_NAME, missingValues);
@@ -182,15 +183,15 @@ public class JsonMessageParser {
.changeIdentifier(changeIdentifier) //
.changeType(changeType) //
.build();
- if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) {
+ if (missingValues.isEmpty() && isChangeTypeCorrect(changeType)) {
return Optional.of(messageMetaData);
} else {
String errorMessage = "VES event parsing.";
if (!missingValues.isEmpty()) {
errorMessage += " Missing data: " + missingValues;
}
- if (!isChangeIdentifierCorrect(changeIdentifier) || !isChangeTypeCorrect(changeType)) {
- errorMessage += " Change identifier or change type is wrong.";
+ if (!isChangeTypeCorrect(changeType)) {
+ errorMessage += " Change type is wrong: " + changeType + " expected: " + FILE_READY_CHANGE_TYPE;
}
errorMessage += " Message: {}";
logger.error(errorMessage, message);
@@ -198,15 +199,12 @@ public class JsonMessageParser {
}
}
- private boolean isChangeTypeCorrect(String changeType) {
+ private static boolean isChangeTypeCorrect(String changeType) {
return FILE_READY_CHANGE_TYPE.equals(changeType);
}
- private boolean isChangeIdentifierCorrect(String changeIdentifier) {
- return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier);
- }
-
- private List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields, MessageMetaData messageMetaData) {
+ private static List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields,
+ MessageMetaData messageMetaData) {
List<FileData> res = new ArrayList<>();
for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
@@ -219,7 +217,7 @@ public class JsonMessageParser {
return res;
}
- private Optional<FileData> getFileDataFromJson(JsonObject fileInfo, MessageMetaData messageMetaData) {
+ private static Optional<FileData> getFileDataFromJson(JsonObject fileInfo, MessageMetaData messageMetaData) {
logger.trace("starting to getFileDataFromJson!");
List<String> missingValues = new ArrayList<>();
@@ -250,15 +248,17 @@ public class JsonMessageParser {
}
/**
- * Gets data from the event name. Defined as: {DomainAbbreviation}_{productName}-{vendorName}_{Description},
- * example: Noti_RnNode-Ericsson_FileReady
+ * Gets data from the event name. Defined as:
+ * {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example:
+ * Noti_RnNode-Ericsson_FileReady
*
* @param dataType The type of data to get, {@link DmaapConsumerJsonParser.EventNameDataType}.
* @param eventName The event name to get the data from.
* @param missingValues List of missing values. The dataType will be added if missing.
* @return String of data from event name
*/
- private String getDataFromEventName(EventNameDataType dataType, String eventName, List<String> missingValues) {
+ private static String getDataFromEventName(EventNameDataType dataType, String eventName,
+ List<String> missingValues) {
String[] eventArray = eventName.split("_|-");
if (eventArray.length >= 4) {
return eventArray[dataType.index];
@@ -269,7 +269,7 @@ public class JsonMessageParser {
return "";
}
- private String getValueFromJson(JsonObject jsonObject, String jsonKey, List<String> missingValues) {
+ private static String getValueFromJson(JsonObject jsonObject, String jsonKey, List<String> missingValues) {
if (jsonObject.has(jsonKey)) {
return jsonObject.get(jsonKey).getAsString();
} else {
@@ -278,11 +278,11 @@ public class JsonMessageParser {
}
}
- private boolean containsNotificationFields(JsonObject jsonObject) {
+ private static boolean containsNotificationFields(JsonObject jsonObject) {
return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS);
}
- private Flux<FileReadyMessage> logErrorAndReturnEmptyMessageFlux(String errorMessage) {
+ private static Flux<FileReadyMessage> logErrorAndReturnEmptyMessageFlux(String errorMessage) {
logger.error(errorMessage);
return Flux.empty();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
index 2b3a0ef6..ff267815 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
@@ -67,7 +67,7 @@ public class PublishedFileCache {
return publishedFiles.size();
}
- private boolean isCachedPublishedFileOutdated(Instant now, Instant then) {
+ private static boolean isCachedPublishedFileOutdated(Instant now, Instant then) {
final int timeToKeepInfoInSeconds = 60 * 60 * 24;
return now.getEpochSecond() - then.getEpochSecond() > timeToKeepInfoInSeconds;
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java
index 8d433827..198c1bf1 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java
@@ -27,7 +27,9 @@ import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.Future;
+
import javax.net.ssl.SSLContext;
+
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
@@ -44,8 +46,6 @@ import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
-import org.springframework.web.util.DefaultUriBuilderFactory;
-import org.springframework.web.util.UriBuilder;
/**
* Client used to send requests to DataRouter.
@@ -139,22 +139,8 @@ public class DmaapProducerHttpClient {
request.addHeader("Authorization", "Basic " + base64Creds);
}
- /**
- * Gets a <code>UriBuilder</code> containing the base URI needed talk to DataRouter. Specific parts can then be
- * added to the URI by the user.
- *
- * @return a <code>UriBuilder</code> containing the base URI needed talk to DataRouter.
- */
- public UriBuilder getBaseUri() {
- return new DefaultUriBuilderFactory().builder() //
- .scheme(configuration.dmaapProtocol()) //
- .host(configuration.dmaapHostName()) //
- .port(configuration.dmaapPortNumber());
- }
-
private CloseableHttpAsyncClient createWebClient(boolean expectRedirect, Duration requestTimeout,
- Map<String, String> contextMap)
- throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
+ Map<String, String> contextMap) throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
SSLContext sslContext =
new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
index e50ef580..f1d33454 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
@@ -21,6 +21,7 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.service.DmaapWebClient;
import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
@@ -29,6 +30,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consume
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.reactive.function.client.WebClient;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -43,7 +45,7 @@ public class DMaaPMessageConsumer {
private final JsonMessageParser jsonMessageParser;
private final DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
- public DMaaPMessageConsumer(AppConfig datafileAppConfig) {
+ public DMaaPMessageConsumer(AppConfig datafileAppConfig) throws DatafileTaskException {
this.jsonMessageParser = new JsonMessageParser();
this.dmaaPConsumerReactiveHttpClient = createHttpClient(datafileAppConfig);
}
@@ -69,8 +71,9 @@ public class DMaaPMessageConsumer {
return jsonMessageParser.getMessagesFromJson(message);
}
- private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig) {
- DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration();
+ private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig)
+ throws DatafileTaskException {
+ DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration().toDmaap();
WebClient client = new DmaapWebClient().fromConfiguration(config).build();
return new DMaaPConsumerReactiveHttpClient(config, client);
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index e5dd01e9..1d6baa65 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -25,6 +25,7 @@ import com.google.gson.JsonParser;
import java.io.IOException;
import java.io.InputStream;
+import java.net.MalformedURLException;
import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
@@ -34,6 +35,8 @@ import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.JsonSerializer;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
@@ -46,6 +49,7 @@ import org.slf4j.MDC;
import org.springframework.core.io.FileSystemResource;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
+import org.springframework.web.util.DefaultUriBuilderFactory;
import reactor.core.publisher.Mono;
@@ -58,12 +62,9 @@ import reactor.core.publisher.Mono;
public class DataRouterPublisher {
private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
private static final String CONTENT_TYPE = "application/octet-stream";
- private static final String PUBLISH_TOPIC = "publish";
- private static final String DEFAULT_FEED_ID = "1";
private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class);
private final AppConfig datafileAppConfig;
- private DmaapProducerHttpClient dmaapProducerReactiveHttpClient;
public DataRouterPublisher(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
@@ -80,7 +81,6 @@ public class DataRouterPublisher {
public Mono<FilePublishInformation> publishFile(FilePublishInformation publishInfo, long numRetries,
Duration firstBackoff) {
MDC.setContextMap(publishInfo.getContext());
- dmaapProducerReactiveHttpClient = resolveClient();
return Mono.just(publishInfo) //
.cache() //
.flatMap(this::publishFile) //
@@ -92,13 +92,14 @@ public class DataRouterPublisher {
MDC.setContextMap(publishInfo.getContext());
logger.trace("Entering publishFile with {}", publishInfo);
try {
+ DmaapProducerHttpClient dmaapProducerHttpClient = resolveClient(publishInfo.getChangeIdentifier());
HttpPut put = new HttpPut();
prepareHead(publishInfo, put);
prepareBody(publishInfo, put);
- dmaapProducerReactiveHttpClient.addUserCredentialsToHead(put);
+ dmaapProducerHttpClient.addUserCredentialsToHead(put);
HttpResponse response =
- dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext());
+ dmaapProducerHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext());
logger.trace("{}", response);
return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
} catch (Exception e) {
@@ -107,11 +108,18 @@ public class DataRouterPublisher {
}
}
- private void prepareHead(FilePublishInformation publishInfo, HttpPut put) {
+ private void prepareHead(FilePublishInformation publishInfo, HttpPut put) throws DatafileTaskException {
+
put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE);
JsonElement metaData = new JsonParser().parse(JsonSerializer.createJsonBodyForDataRouter(publishInfo));
put.addHeader(X_DMAAP_DR_META, metaData.toString());
- put.setURI(getPublishUri(publishInfo.getName()));
+ URI uri = new DefaultUriBuilderFactory(
+ datafileAppConfig.getPublisherConfiguration(publishInfo.getChangeIdentifier()).publishUrl()) //
+ .builder() //
+ .pathSegment(publishInfo.getName()) //
+ .build();
+ put.setURI(uri);
+
MappedDiagnosticContext.appendTraceInfo(put);
}
@@ -122,14 +130,7 @@ public class DataRouterPublisher {
}
}
- private URI getPublishUri(String fileName) {
- return dmaapProducerReactiveHttpClient.getBaseUri() //
- .pathSegment(PUBLISH_TOPIC) //
- .pathSegment(DEFAULT_FEED_ID) //
- .pathSegment(fileName).build();
- }
-
- private Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation publishInfo) {
+ private static Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation publishInfo) {
MDC.setContextMap(publishInfo.getContext());
if (HttpUtils.isSuccessfulResponseCode(response.value())) {
logger.trace("Publish to DR successful!");
@@ -145,11 +146,17 @@ public class DataRouterPublisher {
return realResource.getInputStream();
}
- DmaapPublisherConfiguration resolveConfiguration() {
- return datafileAppConfig.getDmaapPublisherConfiguration();
+ PublisherConfiguration resolveConfiguration(String changeIdentifer) throws DatafileTaskException {
+ return datafileAppConfig.getPublisherConfiguration(changeIdentifer);
}
- DmaapProducerHttpClient resolveClient() {
- return new DmaapProducerHttpClient(resolveConfiguration());
+ DmaapProducerHttpClient resolveClient(String changeIdentifier) throws DatafileTaskException {
+ try {
+ DmaapPublisherConfiguration cfg = resolveConfiguration(changeIdentifier).toDmaap();
+ return new DmaapProducerHttpClient(cfg);
+ } catch (MalformedURLException e) {
+ throw new DatafileTaskException("Cannot resolve producer client", e);
+ }
+
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index 0c62795e..6ddcb541 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -74,12 +74,12 @@ public class FileCollector {
return Mono.just(fileData) //
.cache() //
- .flatMap(fd -> collectFile(fileData, contextMap)) //
+ .flatMap(fd -> tryCollectFile(fileData, contextMap)) //
.retryBackoff(numRetries, firstBackoff) //
- .flatMap(this::checkCollectedFile);
+ .flatMap(FileCollector::checkCollectedFile);
}
- private Mono<FilePublishInformation> checkCollectedFile(Optional<FilePublishInformation> info) {
+ private static Mono<FilePublishInformation> checkCollectedFile(Optional<FilePublishInformation> info) {
if (info.isPresent()) {
return Mono.just(info.get());
} else {
@@ -88,7 +88,7 @@ public class FileCollector {
}
}
- private Mono<Optional<FilePublishInformation>> collectFile(FileData fileData, Map<String, String> context) {
+ private Mono<Optional<FilePublishInformation>> tryCollectFile(FileData fileData, Map<String, String> context) {
MDC.setContextMap(context);
logger.trace("starting to collectFile {}", fileData.name());
@@ -110,7 +110,7 @@ public class FileCollector {
}
} catch (Exception throwable) {
logger.warn("Failed to close ftp client: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
- throwable.toString());
+ throwable.toString(), throwable);
return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context)));
}
}
@@ -126,7 +126,7 @@ public class FileCollector {
}
}
- private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile,
+ private static FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile,
Map<String, String> context) {
String location = fileData.location();
MessageMetaData metaData = fileData.messageMetaData();
@@ -143,6 +143,7 @@ public class FileCollector {
.compression(fileData.compression()) //
.fileFormatType(fileData.fileFormatType()) //
.fileFormatVersion(fileData.fileFormatVersion()) //
+ .changeIdentifier(fileData.messageMetaData().changeIdentifier()) //
.context(context) //
.build();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
index e18da248..4d8d679d 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
@@ -21,18 +21,23 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
import java.io.InputStream;
+import java.net.MalformedURLException;
import java.net.URI;
+import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Map;
+
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -44,12 +49,9 @@ import org.slf4j.MDC;
*
*/
public class PublishedChecker {
- private static final String FEEDLOG_TOPIC = "feedlog";
- private static final String DEFAULT_FEED_ID = "1";
- private static final Duration WEB_CLIENT_TIMEOUT = Duration.ofSeconds(4);
+ private static final Duration WEB_CLIENT_TIMEOUT = Duration.ofSeconds(4);
private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
private final AppConfig appConfig;
/**
@@ -66,19 +68,24 @@ public class PublishedChecker {
*
* @param fileName the name of the file used when it is published.
*
- * @return <code>true</code> if the file has been published before, <code>false</code> otherwise.
+ * @return <code>true</code> if the file has been published before, <code>false</code>
+ * otherwise.
+ * @throws DatafileTaskException if the check fails
*/
- public boolean isFilePublished(String fileName, Map<String, String> contextMap) {
+ public boolean isFilePublished(String fileName, String changeIdentifier, Map<String, String> contextMap)
+ throws DatafileTaskException {
MDC.setContextMap(contextMap);
- DmaapProducerHttpClient producerClient = resolveClient();
+ PublisherConfiguration publisherConfig = resolveConfiguration(changeIdentifier);
+
+ DmaapProducerHttpClient producerClient = resolveClient(publisherConfig);
HttpGet getRequest = new HttpGet();
MappedDiagnosticContext.appendTraceInfo(getRequest);
- getRequest.setURI(getPublishedQueryUri(fileName, producerClient));
- producerClient.addUserCredentialsToHead(getRequest);
-
try {
+ getRequest.setURI(getPublishedQueryUri(fileName, publisherConfig));
+ producerClient.addUserCredentialsToHead(getRequest);
+
HttpResponse response = producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest,
WEB_CLIENT_TIMEOUT, contextMap);
@@ -95,20 +102,23 @@ public class PublishedChecker {
}
}
- private URI getPublishedQueryUri(String fileName, DmaapProducerHttpClient producerClient) {
- return producerClient.getBaseUri() //
- .pathSegment(FEEDLOG_TOPIC) //
- .pathSegment(DEFAULT_FEED_ID) //
- .queryParam("type", "pub") //
- .queryParam("filename", fileName) //
+ private static URI getPublishedQueryUri(String fileName, PublisherConfiguration config) throws URISyntaxException {
+ return new URIBuilder(config.logUrl()) //
+ .addParameter("type", "pub") //
+ .addParameter("filename", fileName) //
.build();
}
- protected DmaapPublisherConfiguration resolveConfiguration() {
- return appConfig.getDmaapPublisherConfiguration();
+ protected PublisherConfiguration resolveConfiguration(String changeIdentifier) throws DatafileTaskException {
+ return appConfig.getPublisherConfiguration(changeIdentifier);
}
- protected DmaapProducerHttpClient resolveClient() {
- return new DmaapProducerHttpClient(resolveConfiguration());
+ protected DmaapProducerHttpClient resolveClient(PublisherConfiguration publisherConfig)
+ throws DatafileTaskException {
+ try {
+ return new DmaapProducerHttpClient(publisherConfig.toDmaap());
+ } catch (MalformedURLException e) {
+ throw new DatafileTaskException("Cannot create published checker client", e);
+ }
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index b5fa0c24..bac52659 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
@@ -86,13 +87,16 @@ public class ScheduledTasks {
threadPoolQueueSize.get());
return;
}
+ if (this.applicationConfiguration.getDmaapConsumerConfiguration() == null) {
+ logger.warn("No configuration loaded, skipping polling for messages");
+ return;
+ }
currentNumberOfSubscriptions.incrementAndGet();
Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
logger.trace("Execution of tasks was registered");
- applicationConfiguration.loadConfigurationFromFile();
createMainTask(context) //
- .subscribe(this::onSuccess, //
+ .subscribe(ScheduledTasks::onSuccess, //
throwable -> {
onError(throwable, context);
currentNumberOfSubscriptions.decrementAndGet();
@@ -115,6 +119,7 @@ public class ScheduledTasks {
.flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) //
.doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
.flatMap(fileData -> createMdcContext(fileData, context)) //
+ .filter(this::isFeedConfigured) //
.filter(this::shouldBePublished) //
.flatMap(this::fetchFile, false, 1, 1) //
.flatMap(this::publishToDataRouter, false, 1, 1) //
@@ -124,13 +129,13 @@ public class ScheduledTasks {
}
private class FileDataWithContext {
- FileDataWithContext(FileData fileData, Map<String, String> context) {
+ public final FileData fileData;
+ public final Map<String, String> context;
+
+ public FileDataWithContext(FileData fileData, Map<String, String> context) {
this.fileData = fileData;
this.context = context;
}
-
- final FileData fileData;
- final Map<String, String> context;
}
/**
@@ -160,7 +165,7 @@ public class ScheduledTasks {
return this.threadPoolQueueSize.get();
}
- protected DMaaPMessageConsumer createConsumerTask() {
+ protected DMaaPMessageConsumer createConsumerTask() throws DatafileTaskException {
return new DMaaPMessageConsumer(this.applicationConfiguration);
}
@@ -172,17 +177,17 @@ public class ScheduledTasks {
return new DataRouterPublisher(applicationConfiguration);
}
- private void onComplete(Map<String, String> contextMap) {
+ private static void onComplete(Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
logger.trace("Datafile tasks have been completed");
}
- private synchronized void onSuccess(FilePublishInformation publishInfo) {
+ private static synchronized void onSuccess(FilePublishInformation publishInfo) {
MDC.setContextMap(publishInfo.getContext());
logger.info("Datafile file published {}", publishInfo.getInternalLocation());
}
- private void onError(Throwable throwable, Map<String, String> context) {
+ private static void onError(Throwable throwable, Map<String, String> context) {
MDC.setContextMap(context);
logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable.toString());
}
@@ -194,11 +199,26 @@ public class ScheduledTasks {
return Mono.just(pair);
}
+ private boolean isFeedConfigured(FileDataWithContext fileData) {
+ if (applicationConfiguration.isFeedConfigured(fileData.fileData.messageMetaData().changeIdentifier())) {
+ return true;
+ } else {
+ logger.info("No feed is configured for: {}, file ignored: {}",
+ fileData.fileData.messageMetaData().changeIdentifier(), fileData.fileData.name());
+ return false;
+ }
+ }
+
private boolean shouldBePublished(FileDataWithContext fileData) {
boolean result = false;
Path localFilePath = fileData.fileData.getLocalFilePath();
if (publishedFilesCache.put(localFilePath) == null) {
- result = !createPublishedChecker().isFilePublished(fileData.fileData.name(), fileData.context);
+ try {
+ result = !createPublishedChecker().isFilePublished(fileData.fileData.name(),
+ fileData.fileData.messageMetaData().changeIdentifier(), fileData.context);
+ } catch (DatafileTaskException e) {
+ logger.error("Cannot check if a file {} is published", fileData.fileData.name(), e);
+ }
}
if (!result) {
currentNumberOfTasks.decrementAndGet();
@@ -248,13 +268,19 @@ public class ScheduledTasks {
*/
private Flux<FileReadyMessage> fetchMoreFileReadyMessages() {
logger.info(
- "Consuming new file ready messages, current number of tasks: {}, published files: {}, number of subscrptions: {}",
+ "Consuming new file ready messages, current number of tasks: {}, published files: {}, "
+ + "number of subscriptions: {}",
getCurrentNumberOfTasks(), publishedFilesCache.size(), this.currentNumberOfSubscriptions.get());
Map<String, String> context = MDC.getCopyOfContextMap();
- return createConsumerTask() //
- .getMessageRouterResponse() //
- .onErrorResume(exception -> handleConsumeMessageFailure(exception, context));
+ try {
+ return createConsumerTask() //
+ .getMessageRouterResponse() //
+ .onErrorResume(exception -> handleConsumeMessageFailure(exception, context));
+ } catch (Exception e) {
+ logger.error("Could not create message consumer task", e);
+ return Flux.empty();
+ }
}
private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> context) {
@@ -264,7 +290,7 @@ public class ScheduledTasks {
return Flux.empty();
}
- private void deleteFile(Path localFile, Map<String, String> context) {
+ private static void deleteFile(Path localFile, Map<String, String> context) {
MDC.setContextMap(context);
logger.trace("Deleting file: {}", localFile);
try {