aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main/java
diff options
context:
space:
mode:
authorPatrikBuhr <patrik.buhr@est.tech>2019-05-29 09:39:53 +0000
committerPatrikBuhr <patrik.buhr@est.tech>2019-05-29 09:39:53 +0000
commit4229afc64d82cbd1ea1e43c92fcd6c9bed9e5137 (patch)
tree6c5c6efecb383a439f27cd163b20edfa364b6ce2 /datafile-app-server/src/main/java
parent2187fc6e6b4b4783389289fb87bd6a3676ad2c72 (diff)
Generalizing Data File Collection to handle any type of file
Extension of the DFC to be able to handle any file types types, which are published on different DR feeds. This association between file type and DR feed is defined by configuration. The file type is defined by the changeIdentifier in the fileReady VES message reported from the PNF. The creation of DR feeds and configuration will be done by the DMAAP plugin, but that is not tested yet. Change-Id: I13b36acd926a6941ee733e6b37922049fb54a5d9 Issue-ID: DCAEGEN2-1532 Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Diffstat (limited to 'datafile-app-server/src/main/java')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java211
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java141
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java110
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java105
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java5
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java74
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java24
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java3
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java8
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java15
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java46
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java20
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java9
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java47
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java13
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java52
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java58
25 files changed, 610 insertions, 357 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index a38eab8f..d324ca99 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -22,22 +22,38 @@ import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import com.google.gson.TypeAdapterFactory;
+
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Properties;
import java.util.ServiceLoader;
+
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.CloudConfigurationProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.ComponentScan;
import org.springframework.stereotype.Component;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
/**
* Holds all configuration for the DFC.
*
@@ -46,105 +62,174 @@ import org.springframework.stereotype.Component;
*/
@Component
+@ComponentScan("org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers")
@EnableConfigurationProperties
@ConfigurationProperties("app")
public class AppConfig {
-
- private static final String CONFIG = "configs";
- private static final String DMAAP = "dmaap";
- private static final String DMAAP_PRODUCER = "dmaapProducerConfiguration";
- private static final String DMAAP_CONSUMER = "dmaapConsumerConfiguration";
- private static final String FTP = "ftp";
- private static final String FTPES_CONFIGURATION = "ftpesConfiguration";
- private static final String SECURITY = "security";
private static final Logger logger = LoggerFactory.getLogger(AppConfig.class);
- private DmaapConsumerConfiguration dmaapConsumerConfiguration;
- private DmaapPublisherConfiguration dmaapPublisherConfiguration;
+ private ConsumerConfiguration dmaapConsumerConfiguration;
+ private Map<String, PublisherConfiguration> publishingConfiguration;
private FtpesConfig ftpesConfiguration;
+ private CloudConfigurationProvider cloudConfigurationProvider;
+ @Value("#{systemEnvironment}")
+ Properties systemEnvironment;
+ private Disposable refreshConfigTask = null;
@NotEmpty
private String filepath;
- public synchronized DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
+ @Autowired
+ public synchronized void setCloudConfigurationProvider(
+ CloudConfigurationProvider reactiveCloudConfigurationProvider) {
+ this.cloudConfigurationProvider = reactiveCloudConfigurationProvider;
+ }
+
+ public synchronized void setFilepath(String filepath) {
+ this.filepath = filepath;
+ }
+
+ /**
+ * Reads the cloud configuration.
+ */
+ public void initialize() {
+ stop();
+ Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
+ loadConfigurationFromFile();
+
+ refreshConfigTask = Flux.interval(Duration.ZERO, Duration.ofMinutes(5))
+ .flatMap(count -> createRefreshConfigurationTask(count, context))
+ .subscribe(e -> logger.info("Refreshed configuration data"),
+ throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
+ () -> logger.error("Configuration refresh terminated"));
+ }
+
+ public void stop() {
+ if (refreshConfigTask != null) {
+ refreshConfigTask.dispose();
+ refreshConfigTask = null;
+ }
+ }
+
+ public synchronized ConsumerConfiguration getDmaapConsumerConfiguration() {
return dmaapConsumerConfiguration;
}
- public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
- return dmaapPublisherConfiguration;
+ public synchronized boolean isFeedConfigured(String changeIdentifier) {
+ return publishingConfiguration.containsKey(changeIdentifier);
+ }
+
+ public synchronized PublisherConfiguration getPublisherConfiguration(String changeIdentifier)
+ throws DatafileTaskException {
+
+ if (publishingConfiguration == null) {
+ throw new DatafileTaskException("No PublishingConfiguration loaded, changeIdentifier: " + changeIdentifier);
+ }
+ PublisherConfiguration cfg = publishingConfiguration.get(changeIdentifier);
+ if (cfg == null) {
+ throw new DatafileTaskException(
+ "Cannot find getPublishingConfiguration for changeIdentifier: " + changeIdentifier);
+ }
+ return cfg;
}
public synchronized FtpesConfig getFtpesConfiguration() {
return ftpesConfiguration;
}
+ Flux<AppConfig> createRefreshConfigurationTask(Long counter, Map<String, String> context) {
+ return Flux.just(counter) //
+ .doOnNext(cnt -> logger.debug("Refresh config {}", cnt)) //
+ .flatMap(cnt -> readEnvironmentVariables(systemEnvironment, context)) //
+ .flatMap(this::fetchConfiguration);
+ }
+
+ Mono<EnvProperties> readEnvironmentVariables(Properties systemEnvironment, Map<String, String> context) {
+ return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context)
+ .onErrorResume(AppConfig::onErrorResume);
+ }
+
+ private static <R> Mono<R> onErrorResume(Throwable trowable) {
+ logger.error("Could not refresh application configuration {}", trowable.toString());
+ return Mono.empty();
+ }
+
+ private Mono<AppConfig> fetchConfiguration(EnvProperties env) {
+ Mono<JsonObject> serviceCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(env) //
+ .onErrorResume(AppConfig::onErrorResume);
+
+ // Note, have to use this callForServiceConfigurationReactive with EnvProperties, since the
+ // other ones does not work
+ EnvProperties dmaapEnv = ImmutableEnvProperties.builder() //
+ .consulHost(env.consulHost()) //
+ .consulPort(env.consulPort()) //
+ .cbsName(env.cbsName()) //
+ .appName(env.appName() + ":dmaap") //
+ .build(); //
+ Mono<JsonObject> dmaapCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(dmaapEnv)
+ .onErrorResume(t -> Mono.just(new JsonObject()));
+
+ return serviceCfg.zipWith(dmaapCfg, this::parseCloudConfig) //
+ .onErrorResume(AppConfig::onErrorResume);
+ }
+
+ /**
+ * parse configuration
+ *
+ * @param serviceConfigRootObject
+ * @param dmaapConfigRootObject if there is no dmaapConfigRootObject, the dmaap feeds are taken
+ * from the serviceConfigRootObject
+ * @return this which is updated if successful
+ */
+ private AppConfig parseCloudConfig(JsonObject serviceConfigRootObject, JsonObject dmaapConfigRootObject) {
+ try {
+ CloudConfigParser parser = new CloudConfigParser(serviceConfigRootObject, dmaapConfigRootObject);
+ setConfiguration(parser.getDmaapConsumerConfig(), parser.getDmaapPublisherConfig(),
+ parser.getFtpesConfig());
+ } catch (DatafileTaskException e) {
+ logger.error("Could not parse configuration {}", e.toString(), e);
+ }
+ return this;
+ }
+
/**
* Reads the configuration from file.
*/
- public void loadConfigurationFromFile() {
+ void loadConfigurationFromFile() {
GsonBuilder gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
- JsonParser parser = new JsonParser();
- JsonObject jsonObject;
+
try (InputStream inputStream = createInputStream(filepath)) {
- JsonElement rootElement = getJsonElement(parser, inputStream);
- if (rootElement.isJsonObject()) {
- jsonObject = rootElement.getAsJsonObject();
- FtpesConfig ftpesConfig = deserializeType(gsonBuilder,
- jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(FTP).getAsJsonObject(FTPES_CONFIGURATION),
- FtpesConfig.class);
- DmaapConsumerConfiguration consumerConfiguration = deserializeType(gsonBuilder,
- concatenateJsonObjects(
- jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP)
- .getAsJsonObject(DMAAP_CONSUMER),
- rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
- DmaapConsumerConfiguration.class);
-
- DmaapPublisherConfiguration publisherConfiguration = deserializeType(gsonBuilder,
- concatenateJsonObjects(
- jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP)
- .getAsJsonObject(DMAAP_PRODUCER),
- rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
- DmaapPublisherConfiguration.class);
-
- setConfiguration(consumerConfiguration, publisherConfiguration, ftpesConfig);
+ JsonParser parser = new JsonParser();
+ JsonObject rootObject = getJsonElement(parser, inputStream).getAsJsonObject();
+ if (rootObject == null) {
+ throw new JsonSyntaxException("Root is not a json object");
}
+ parseCloudConfig(rootObject, rootObject);
} catch (JsonSyntaxException | IOException e) {
- logger.error("Problem with loading configuration, file: {}", filepath, e);
+ logger.warn("Local configuration file not loaded: {}", filepath, e);
}
}
- synchronized void setConfiguration(DmaapConsumerConfiguration consumerConfiguration,
- DmaapPublisherConfiguration publisherConfiguration, FtpesConfig ftpesConfig) {
- this.dmaapConsumerConfiguration = consumerConfiguration;
- this.dmaapPublisherConfiguration = publisherConfiguration;
- this.ftpesConfiguration = ftpesConfig;
+ private synchronized void setConfiguration(ConsumerConfiguration consumerConfiguration,
+ Map<String, PublisherConfiguration> publisherConfiguration, FtpesConfig ftpesConfig) {
+ if (consumerConfiguration == null || publisherConfiguration == null || ftpesConfig == null) {
+ logger.error(
+ "Problem with configuration consumerConfiguration: {}, publisherConfiguration: {}, ftpesConfig: {}",
+ consumerConfiguration, publisherConfiguration, ftpesConfig);
+ } else {
+ this.dmaapConsumerConfiguration = consumerConfiguration;
+ this.publishingConfiguration = publisherConfiguration;
+ this.ftpesConfiguration = ftpesConfig;
+ }
}
JsonElement getJsonElement(JsonParser parser, InputStream inputStream) {
return parser.parse(new InputStreamReader(inputStream));
}
- private <T> T deserializeType(@NotNull GsonBuilder gsonBuilder, @NotNull JsonObject jsonObject,
- @NotNull Class<T> type) {
- return gsonBuilder.create().fromJson(jsonObject, type);
- }
-
InputStream createInputStream(@NotNull String filepath) throws IOException {
return new BufferedInputStream(new FileInputStream(filepath));
}
- synchronized String getFilepath() {
- return this.filepath;
- }
-
- public synchronized void setFilepath(String filepath) {
- this.filepath = filepath;
- }
-
- private JsonObject concatenateJsonObjects(JsonObject target, JsonObject source) {
- source.entrySet().forEach(entry -> target.add(entry.getKey(), entry.getValue()));
- return target;
- }
-
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
index 6b7860c4..3ac6b2c6 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
@@ -18,11 +18,17 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
/**
@@ -32,63 +38,106 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.Immutabl
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
public class CloudConfigParser {
-
private static final String DMAAP_SECURITY_TRUST_STORE_PATH = "dmaap.security.trustStorePath";
private static final String DMAAP_SECURITY_TRUST_STORE_PASS_PATH = "dmaap.security.trustStorePasswordPath";
private static final String DMAAP_SECURITY_KEY_STORE_PATH = "dmaap.security.keyStorePath";
private static final String DMAAP_SECURITY_KEY_STORE_PASS_PATH = "dmaap.security.keyStorePasswordPath";
private static final String DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH = "dmaap.security.enableDmaapCertAuth";
- private final JsonObject jsonObject;
+ private final JsonObject serviceConfigurationRoot;
+ private final JsonObject dmaapConfigurationRoot;
- CloudConfigParser(JsonObject jsonObject) {
- this.jsonObject = jsonObject;
+ public CloudConfigParser(JsonObject serviceConfigurationRoot, JsonObject dmaapConfigurationRoot) {
+ this.serviceConfigurationRoot = serviceConfigurationRoot;
+ this.dmaapConfigurationRoot = dmaapConfigurationRoot;
}
- DmaapPublisherConfiguration getDmaapPublisherConfig() {
- return new ImmutableDmaapPublisherConfiguration.Builder()
- .dmaapTopicName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapTopicName").getAsString())
- .dmaapUserPassword(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserPassword").getAsString())
- .dmaapPortNumber(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapPortNumber").getAsInt())
- .dmaapProtocol(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapProtocol").getAsString())
- .dmaapContentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString())
- .dmaapHostName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapHostName").getAsString())
- .dmaapUserName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserName").getAsString())
- .trustStorePath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PATH).getAsString())
- .trustStorePasswordPath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PASS_PATH).getAsString())
- .keyStorePath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PATH).getAsString())
- .keyStorePasswordPath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PASS_PATH).getAsString())
- .enableDmaapCertAuth(jsonObject.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
- .build();
+ public Map<String, PublisherConfiguration> getDmaapPublisherConfig() throws DatafileTaskException {
+ Iterator<JsonElement> producerCfgs =
+ toArray(serviceConfigurationRoot.get("dmaap.dmaapProducerConfiguration")).iterator();
+
+ Map<String, PublisherConfiguration> result = new HashMap<>();
+
+ while (producerCfgs.hasNext()) {
+ JsonObject producerCfg = producerCfgs.next().getAsJsonObject();
+ String feedName = getAsString(producerCfg, "feedName");
+ JsonObject feedConfig = getFeedConfig(feedName);
+
+ PublisherConfiguration cfg = ImmutablePublisherConfiguration.builder() //
+ .publishUrl(getAsString(feedConfig, "publish_url")) //
+ .passWord(getAsString(feedConfig, "password")) //
+ .userName(getAsString(feedConfig, "username")) //
+ .trustStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PATH)) //
+ .trustStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) //
+ .keyStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PATH)) //
+ .keyStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) //
+ .enableDmaapCertAuth(
+ get(serviceConfigurationRoot, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
+ .changeIdentifier(getAsString(producerCfg, "changeIdentifier")) //
+ .logUrl(getAsString(feedConfig, "log_url")) //
+ .build();
+
+ result.put(cfg.changeIdentifier(), cfg);
+ }
+ return result;
}
- DmaapConsumerConfiguration getDmaapConsumerConfig() {
- return new ImmutableDmaapConsumerConfiguration.Builder()
- .timeoutMs(jsonObject.get("dmaap.dmaapConsumerConfiguration.timeoutMs").getAsInt())
- .dmaapHostName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapHostName").getAsString())
- .dmaapUserName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserName").getAsString())
- .dmaapUserPassword(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserPassword").getAsString())
- .dmaapTopicName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapTopicName").getAsString())
- .dmaapPortNumber(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapPortNumber").getAsInt())
- .dmaapContentType(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapContentType").getAsString())
- .messageLimit(jsonObject.get("dmaap.dmaapConsumerConfiguration.messageLimit").getAsInt())
- .dmaapProtocol(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapProtocol").getAsString())
- .consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString())
- .consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString())
- .trustStorePath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PATH).getAsString())
- .trustStorePasswordPath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PASS_PATH).getAsString())
- .keyStorePath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PATH).getAsString())
- .keyStorePasswordPath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PASS_PATH).getAsString())
- .enableDmaapCertAuth(jsonObject.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
+ public ConsumerConfiguration getDmaapConsumerConfig() throws DatafileTaskException {
+ JsonObject consumerCfg = serviceConfigurationRoot.get("streams_subscribes").getAsJsonObject();
+ Set<Entry<String, JsonElement>> topics = consumerCfg.entrySet();
+ if (topics.size() != 1) {
+ throw new DatafileTaskException("Invalid configuration, number oftopic must be one, config: " + topics);
+ }
+ JsonObject topic = topics.iterator().next().getValue().getAsJsonObject();
+ JsonObject dmaapInfo = get(topic, "dmmap_info").getAsJsonObject();
+ String topicUrl = getAsString(dmaapInfo, "topic_url");
+
+ return ImmutableConsumerConfiguration.builder().topicUrl(topicUrl)
+ .trustStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PATH))
+ .trustStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PASS_PATH))
+ .keyStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PATH))
+ .keyStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PASS_PATH))
+ .enableDmaapCertAuth(
+ get(serviceConfigurationRoot, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
.build();
}
- FtpesConfig getFtpesConfig() {
+ public FtpesConfig getFtpesConfig() throws DatafileTaskException {
return new ImmutableFtpesConfig.Builder() //
- .keyCert(jsonObject.get("dmaap.ftpesConfig.keyCert").getAsString())
- .keyPassword(jsonObject.get("dmaap.ftpesConfig.keyPassword").getAsString())
- .trustedCa(jsonObject.get("dmaap.ftpesConfig.trustedCa").getAsString())
- .trustedCaPassword(jsonObject.get("dmaap.ftpesConfig.trustedCaPassword").getAsString()) //
+ .keyCert(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyCert"))
+ .keyPassword(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyPassword"))
+ .trustedCa(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.trustedCa"))
+ .trustedCaPassword(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.trustedCaPassword")) //
.build();
}
+
+ private static JsonElement get(JsonObject obj, String memberName) throws DatafileTaskException {
+ JsonElement elem = obj.get(memberName);
+ if (elem == null) {
+ throw new DatafileTaskException("Could not find member: " + memberName + " in: " + obj);
+ }
+ return elem;
+ }
+
+ private static String getAsString(JsonObject obj, String memberName) throws DatafileTaskException {
+ return get(obj, memberName).getAsString();
+ }
+
+ private JsonObject getFeedConfig(String feedName) throws DatafileTaskException {
+ JsonElement elem = dmaapConfigurationRoot.get(feedName);
+ if (elem == null) {
+ elem = get(serviceConfigurationRoot, feedName); // Fallback, try to find it under
+ // serviceConfigurationRoot
+ }
+ return elem.getAsJsonObject();
+ }
+
+ private static JsonArray toArray(JsonElement obj) {
+ if (obj.isJsonArray()) {
+ return obj.getAsJsonArray();
+ }
+ JsonArray arr = new JsonArray();
+ arr.add(obj);
+ return arr;
+ }
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
deleted file mode 100644
index 597f525f..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*-
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.configuration;
-
-import com.google.gson.JsonObject;
-import java.util.Map;
-import java.util.Properties;
-import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.ReactiveCloudConfigurationProvider;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.Primary;
-import org.springframework.scheduling.annotation.EnableScheduling;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
-
-/**
- * Gets the DFC configuration from the ConfigBindingService/Consul and parses it to the configurations needed in DFC.
- *
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-@Configuration
-@ComponentScan("org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers")
-@EnableConfigurationProperties
-@EnableScheduling
-@Primary
-public class CloudConfiguration extends AppConfig {
- private static final Logger logger = LoggerFactory.getLogger(CloudConfiguration.class);
- private ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider;
- private DmaapPublisherConfiguration dmaapPublisherCloudConfiguration;
- private DmaapConsumerConfiguration dmaapConsumerCloudConfiguration;
- private FtpesConfig ftpesCloudConfiguration;
-
- @Value("#{systemEnvironment}")
- private Properties systemEnvironment;
-
- @Autowired
- public synchronized void setThreadPoolTaskScheduler(
- ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider) {
- this.reactiveCloudConfigurationProvider = reactiveCloudConfigurationProvider;
- }
-
- /**
- * Reads the cloud configuration.
- */
- public void runTask() {
- Map<String,String> context = MappedDiagnosticContext.initializeTraceContext();
- EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context) //
- .subscribeOn(Schedulers.parallel()) //
- .flatMap(reactiveCloudConfigurationProvider::callForServiceConfigurationReactive) //
- .flatMap(this::parseCloudConfig) //
- .subscribe(null, this::onError, this::onComplete);
- }
-
- private void onComplete() {
- logger.trace("Configuration updated");
- }
-
- private void onError(Throwable throwable) {
- logger.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable);
- }
-
- private synchronized Mono<CloudConfiguration> parseCloudConfig(JsonObject jsonObject) {
- logger.info("Received application configuration: {}", jsonObject);
- CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject);
- dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig();
- dmaapConsumerCloudConfiguration = cloudConfigParser.getDmaapConsumerConfig();
- ftpesCloudConfiguration = cloudConfigParser.getFtpesConfig();
- return Mono.just(this);
- }
-
- @Override
- public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
- return dmaapPublisherCloudConfiguration != null ? dmaapPublisherCloudConfiguration
- : super.getDmaapPublisherConfiguration();
- }
-
- @Override
- public synchronized DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
- return dmaapConsumerCloudConfiguration != null ? dmaapConsumerCloudConfiguration
- : super.getDmaapConsumerConfiguration();
- }
-
- @Override
- public synchronized FtpesConfig getFtpesConfiguration() {
- return ftpesCloudConfiguration != null ? ftpesCloudConfiguration : super.getFtpesConfiguration();
- }
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java
new file mode 100644
index 00000000..fc9ab204
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java
@@ -0,0 +1,105 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018,2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.configuration;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
+
+@Value.Immutable
+@Value.Style(redactedMask = "####")
+@Gson.TypeAdapters
+public abstract class ConsumerConfiguration {
+ @Value.Redacted
+ public abstract String topicUrl();
+
+ public abstract String trustStorePath();
+
+ public abstract String trustStorePasswordPath();
+
+ public abstract String keyStorePath();
+
+ public abstract String keyStorePasswordPath();
+
+ public abstract Boolean enableDmaapCertAuth();
+
+ public DmaapConsumerConfiguration toDmaap() throws DatafileTaskException {
+ try {
+ URL url = new URL(topicUrl());
+ String passwd = "";
+ String userName = "";
+ if (url.getUserInfo() != null) {
+ String[] userInfo = url.getUserInfo().split(":");
+ userName = userInfo[0];
+ passwd = userInfo[1];
+ }
+ String urlPath = url.getPath();
+ DmaapConsumerUrlPath path = parseDmaapUrlPath(urlPath);
+
+ return new ImmutableDmaapConsumerConfiguration.Builder() //
+ .dmaapContentType("application/json") //
+ .dmaapPortNumber(url.getPort()) //
+ .dmaapHostName(url.getHost()) //
+ .dmaapTopicName(path.dmaapTopicName) //
+ .dmaapProtocol(url.getProtocol()) //
+ .dmaapUserName(userName) //
+ .dmaapUserPassword(passwd) //
+ .trustStorePath(this.trustStorePath()) //
+ .trustStorePasswordPath(this.trustStorePasswordPath()) //
+ .keyStorePath(this.keyStorePath()) //
+ .keyStorePasswordPath(this.keyStorePasswordPath()) //
+ .enableDmaapCertAuth(this.enableDmaapCertAuth()) //
+ .consumerId(path.consumerId) //
+ .consumerGroup(path.consumerGroup) //
+ .timeoutMs(-1) //
+ .messageLimit(-1) //
+ .build();
+ } catch (MalformedURLException e) {
+ throw new DatafileTaskException("Could not parse the URL", e);
+ }
+ }
+
+ private class DmaapConsumerUrlPath {
+ final String dmaapTopicName;
+ final String consumerGroup;
+ final String consumerId;
+
+ DmaapConsumerUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) {
+ this.dmaapTopicName = dmaapTopicName;
+ this.consumerGroup = consumerGroup;
+ this.consumerId = consumerId;
+ }
+ }
+
+ private DmaapConsumerUrlPath parseDmaapUrlPath(String urlPath) throws DatafileTaskException {
+ String[] tokens = urlPath.split("/"); // UrlPath: /events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12
+ if (tokens.length != 5) {
+ throw new DatafileTaskException("The path has incorrect syntax: " + urlPath);
+ }
+
+ final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // ex. // /events/unauthenticated.VES_NOTIFICATION_OUTPUT
+ final String consumerGroup = tokens[3]; // ex. OpenDcae-c12
+ final String consumerId = tokens[4]; // ex. C12
+ return new DmaapConsumerUrlPath(dmaapTopicName, consumerGroup, consumerId);
+ }
+
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
index 71003f80..62af92a8 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
@@ -19,12 +19,14 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+
import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
+
import reactor.core.publisher.Mono;
/**
@@ -53,7 +55,7 @@ class EnvironmentProcessor {
} catch (EnvironmentLoaderException e) {
return Mono.error(e);
}
- logger.info("Evaluated environment system variables {}", envProperties);
+ logger.trace("Evaluated environment system variables {}", envProperties);
return Mono.just(envProperties);
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java
index 3f029359..844699eb 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java
@@ -21,6 +21,7 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
import java.io.Serializable;
+
import org.immutables.gson.Gson;
import org.immutables.value.Value;
import org.springframework.stereotype.Component;
@@ -28,7 +29,7 @@ import org.springframework.stereotype.Component;
@Component
@Value.Immutable
-@Value.Style(builder = "new")
+@Value.Style(builder = "new", redactedMask = "####")
@Gson.TypeAdapters
public abstract class FtpesConfig implements Serializable {
@@ -38,11 +39,13 @@ public abstract class FtpesConfig implements Serializable {
public abstract String keyCert();
@Value.Parameter
+ @Value.Redacted
public abstract String keyPassword();
@Value.Parameter
public abstract String trustedCa();
@Value.Parameter
+ @Value.Redacted
public abstract String trustedCaPassword();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java
new file mode 100644
index 00000000..5576ed26
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java
@@ -0,0 +1,74 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018,2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.configuration;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+
+
+@Value.Immutable
+@Value.Style(redactedMask = "####")
+@Gson.TypeAdapters
+public interface PublisherConfiguration {
+
+ String publishUrl();
+
+ String logUrl();
+
+ String userName();
+
+ @Value.Redacted
+ String passWord();
+
+ String trustStorePath();
+
+ String trustStorePasswordPath();
+
+ String keyStorePath();
+
+ String keyStorePasswordPath();
+
+ Boolean enableDmaapCertAuth();
+
+ String changeIdentifier();
+
+ default DmaapPublisherConfiguration toDmaap() throws MalformedURLException {
+ URL url = new URL(publishUrl());
+ String urlPath = url.getPath();
+
+ return new ImmutableDmaapPublisherConfiguration.Builder() //
+ .dmaapContentType("application/octet-stream") //
+ .dmaapPortNumber(url.getPort()) //
+ .dmaapHostName(url.getHost()) //
+ .dmaapTopicName(urlPath) //
+ .dmaapProtocol(url.getProtocol()) //
+ .dmaapUserName(this.userName()) //
+ .dmaapUserPassword(this.passWord()) //
+ .trustStorePath(this.trustStorePath()) //
+ .trustStorePasswordPath(this.trustStorePasswordPath()) //
+ .keyStorePath(this.keyStorePath()) //
+ .keyStorePasswordPath(this.keyStorePasswordPath()) //
+ .enableDmaapCertAuth(this.enableDmaapCertAuth()) //
+ .build();
+ }
+
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
index b78e4ae5..5835de1a 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
@@ -16,7 +16,6 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
-import io.swagger.annotations.ApiOperation;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
@@ -24,7 +23,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
+
import javax.annotation.PostConstruct;
+
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
import org.slf4j.Logger;
@@ -36,6 +37,8 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
+
+import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
/**
@@ -49,7 +52,6 @@ import reactor.core.publisher.Mono;
public class SchedulerConfig {
private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = Duration.ofSeconds(15);
- private static final Duration SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = Duration.ofMinutes(5);
private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE = Duration.ofHours(1);
private static final Logger logger = LoggerFactory.getLogger(SchedulerConfig.class);
private static List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
@@ -57,21 +59,21 @@ public class SchedulerConfig {
private final TaskScheduler taskScheduler;
private final ScheduledTasks scheduledTask;
- private final CloudConfiguration cloudConfiguration;
+ private final AppConfig configuration;
/**
* Constructor.
*
* @param taskScheduler The scheduler used to schedule the tasks.
* @param scheduledTasks The scheduler that will actually handle the tasks.
- * @param cloudConfiguration The DFC configuration.
+ * @param configuration The DFC configuration.
*/
@Autowired
public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTasks,
- CloudConfiguration cloudConfiguration) {
+ AppConfig configuration) {
this.taskScheduler = taskScheduler;
this.scheduledTask = scheduledTasks;
- this.cloudConfiguration = cloudConfiguration;
+ this.configuration = configuration;
}
/**
@@ -83,6 +85,7 @@ public class SchedulerConfig {
public synchronized Mono<ResponseEntity<String>> getResponseFromCancellationOfTasks() {
scheduledFutureList.forEach(x -> x.cancel(false));
scheduledFutureList.clear();
+ configuration.stop();
MDC.setContextMap(contextMap);
logger.info("Stopped Datafile workflow");
MDC.clear();
@@ -99,12 +102,11 @@ public class SchedulerConfig {
public synchronized boolean tryToStartTask() {
contextMap = MappedDiagnosticContext.initializeTraceContext();
logger.info("Start scheduling Datafile workflow");
+ configuration.initialize();
+
if (scheduledFutureList.isEmpty()) {
- scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(cloudConfiguration::runTask,
- Instant.now(), SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY));
- scheduledFutureList.add(
- taskScheduler.scheduleWithFixedDelay(scheduledTask::executeDatafileMainTask,
- SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
+ scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(scheduledTask::executeDatafileMainTask,
+ SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
scheduledFutureList
.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()),
SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE));
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java
index 71242265..5a78261a 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java
@@ -62,7 +62,7 @@ public class SwaggerConfig extends WebMvcConfigurationSupport {
.build();
}
- private ApiInfo apiInfo() {
+ private static ApiInfo apiInfo() {
return new ApiInfoBuilder() //
.title(API_TITLE) //
.description(DESCRIPTION) //
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java
index cbd67297..847ca46e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java
@@ -41,7 +41,7 @@ public class TomcatHttpConfig {
return tomcat;
}
- private Connector getHttpConnector() {
+ private static Connector getHttpConnector() {
Connector connector = new Connector(TomcatServletWebServerFactory.DEFAULT_PROTOCOL);
connector.setScheme("http");
connector.setPort(8100);
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
index 4716fa87..791f0cf1 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
@@ -71,7 +71,7 @@ public class ScheduleController {
public Mono<ResponseEntity<String>> startTasks() {
return Mono.fromSupplier(schedulerConfig::tryToStartTask) //
- .map(this::createStartTaskResponse);
+ .map(ScheduleController::createStartTaskResponse);
}
/**
@@ -90,7 +90,7 @@ public class ScheduleController {
}
@ApiOperation(value = "Sends success or error response on starting task execution")
- private ResponseEntity<String> createStartTaskResponse(boolean wasScheduled) {
+ private static ResponseEntity<String> createStartTaskResponse(boolean wasScheduled) {
if (wasScheduled) {
return new ResponseEntity<>("Datafile Service has been started!", HttpStatus.CREATED);
} else {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
index 4c49dd8a..72623db2 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
@@ -17,6 +17,7 @@
package org.onap.dcaegen2.collectors.datafile.ftp;
import java.util.Optional;
+
import org.immutables.value.Value;
/**
@@ -26,11 +27,13 @@ import org.immutables.value.Value;
*
*/
@Value.Immutable
+@Value.Style(redactedMask = "####")
public interface FileServerData {
public String serverAddress();
public String userId();
+ @Value.Redacted
public String password();
public Optional<Integer> port();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
index c78ae3a3..bb3016ce 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
@@ -128,7 +128,7 @@ public class FtpsClient implements FileCollectClient {
logger.trace("collectFile fetched: {}", localFileName);
}
- private int getPort(Optional<Integer> port) {
+ private static int getPort(Optional<Integer> port) {
return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT;
}
@@ -180,7 +180,7 @@ public class FtpsClient implements FileCollectClient {
logger.warn("Local file {} already created", localFileName);
}
OutputStream output = new FileOutputStream(localFile);
- logger.debug("File {} opened xNF", localFileName);
+ logger.trace("File {} opened xNF", localFileName);
return output;
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
index 333be92a..bdaf6d4c 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
@@ -55,7 +55,7 @@ public class SftpClient implements FileCollectClient {
try {
sftpChannel.get(remoteFile, localFile.toString());
- logger.debug("File {} Download Successfull from xNF", localFile.getFileName());
+ logger.trace("File {} Download Successfull from xNF", localFile.getFileName());
} catch (SftpException e) {
boolean retry = e.id != ChannelSftp.SSH_FX_NO_SUCH_FILE && e.id != ChannelSftp.SSH_FX_PERMISSION_DENIED && e.id != ChannelSftp.SSH_FX_OP_UNSUPPORTED;
throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e, retry);
@@ -90,11 +90,11 @@ public class SftpClient implements FileCollectClient {
}
}
- private int getPort(Optional<Integer> port) {
+ private static int getPort(Optional<Integer> port) {
return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT;
}
- private Session setUpSession(FileServerData fileServerData) throws JSchException {
+ private static Session setUpSession(FileServerData fileServerData) throws JSchException {
JSch jsch = new JSch();
Session newSession =
@@ -105,7 +105,7 @@ public class SftpClient implements FileCollectClient {
return newSession;
}
- private ChannelSftp getChannel(Session session) throws JSchException {
+ private static ChannelSftp getChannel(Session session) throws JSchException {
Channel channel = session.openChannel("sftp");
channel.connect();
return (ChannelSftp) channel;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
index 0a6b669c..36aae949 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
@@ -38,6 +38,7 @@ import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
*/
@Value.Immutable
@Gson.TypeAdapters
+@Value.Style(redactedMask = "####")
public abstract class FileData {
public static final String DATAFILE_TMPDIR = "/tmp/onap_datafile/";
@@ -53,6 +54,7 @@ public abstract class FileData {
*
* @return the URL to use to fetch the file from the PNF
*/
+ @Value.Redacted
public abstract String location();
/**
@@ -123,7 +125,7 @@ public abstract class FileData {
* @return An <code>Optional</code> containing a String array with the user name and password if given, or an empty
* <code>Optional</code> if not given.
*/
- private Optional<String[]> getUserNameAndPasswordIfGiven(String userInfoString) {
+ private static Optional<String[]> getUserNameAndPasswordIfGiven(String userInfoString) {
if (userInfoString != null) {
String[] userAndPassword = userInfoString.split(":");
if (userAndPassword.length == 2) {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java
index 63ed0daa..5b8c015e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java
@@ -33,6 +33,7 @@ import org.immutables.value.Value;
@Value.Immutable
@Gson.TypeAdapters
+@Value.Style(redactedMask = "####")
public interface FilePublishInformation {
@SerializedName("productName")
@@ -54,6 +55,7 @@ public interface FilePublishInformation {
String getTimeZoneOffset();
@SerializedName("location")
+ @Value.Redacted
String getLocation();
@SerializedName("compression")
@@ -70,4 +72,6 @@ public interface FilePublishInformation {
String getName();
Map<String, String> getContext();
+
+ String getChangeIdentifier();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java
index 7081d1ac..b8df125b 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java
@@ -31,12 +31,12 @@ import java.util.Set;
*/
public abstract class JsonSerializer {
+ private JsonSerializer() {}
- private static Gson gson =
- new GsonBuilder() //
- .serializeNulls() //
- .addSerializationExclusionStrategy(new FilePublishInformationExclusionStrategy()) //
- .create(); //
+ private static Gson gson = new GsonBuilder() //
+ .serializeNulls() //
+ .addSerializationExclusionStrategy(new FilePublishInformationExclusionStrategy()) //
+ .create(); //
/**
* Serializes a <code>filePublishInformation</code>.
@@ -56,9 +56,10 @@ public abstract class JsonSerializer {
private final Set<String> inclusions =
Sets.newHashSet("productName", "vendorName", "lastEpochMicrosec", "sourceName", "startEpochMicrosec",
"timeZoneOffset", "location", "compression", "fileFormatType", "fileFormatVersion");
+
@Override
- public boolean shouldSkipField(FieldAttributes f) {
- return !inclusions.contains(f.getName());
+ public boolean shouldSkipField(FieldAttributes fieldAttributes) {
+ return !inclusions.contains(fieldAttributes.getName());
}
@Override
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index 3a3eb3aa..470c4e73 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -22,11 +22,13 @@ import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
+
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.StreamSupport;
+
import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
@@ -37,6 +39,7 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -70,7 +73,6 @@ public class JsonMessageParser {
private static final String FILE_FORMAT_VERSION = "fileFormatVersion";
private static final String FILE_READY_CHANGE_TYPE = "FileReady";
- private static final String FILE_READY_CHANGE_IDENTIFIER = "PM_MEAS_FILES";
/**
* The data types available in the event name.
@@ -92,7 +94,7 @@ public class JsonMessageParser {
* @return a <code>Flux</code> containing messages.
*/
public Flux<FileReadyMessage> getMessagesFromJson(Mono<String> rawMessage) {
- return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createMessageData);
+ return rawMessage.flatMapMany(JsonMessageParser::getJsonParserMessage).flatMap(this::createMessageData);
}
Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
@@ -123,18 +125,17 @@ public class JsonMessageParser {
: getMessagesFromJsonArray(jsonElement);
}
- private Mono<JsonElement> getJsonParserMessage(String message) {
- logger.trace("original message from message router: {}", message);
+ private static Mono<JsonElement> getJsonParserMessage(String message) {
return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
}
- private Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) {
+ private static Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) {
return jsonObject.flatMap(monoJsonP -> containsNotificationFields(monoJsonP) ? transformMessages(monoJsonP)
: logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject));
}
- private Mono<FileReadyMessage> transformMessages(JsonObject message) {
+ private static Mono<FileReadyMessage> transformMessages(JsonObject message) {
Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message);
if (optionalMessageMetaData.isPresent()) {
MessageMetaData messageMetaData = optionalMessageMetaData.get();
@@ -159,7 +160,7 @@ public class JsonMessageParser {
}
- private Optional<MessageMetaData> getMessageMetaData(JsonObject message) {
+ private static Optional<MessageMetaData> getMessageMetaData(JsonObject message) {
List<String> missingValues = new ArrayList<>();
JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER);
String eventName = getValueFromJson(commonEventHeader, EVENT_NAME, missingValues);
@@ -182,15 +183,15 @@ public class JsonMessageParser {
.changeIdentifier(changeIdentifier) //
.changeType(changeType) //
.build();
- if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) {
+ if (missingValues.isEmpty() && isChangeTypeCorrect(changeType)) {
return Optional.of(messageMetaData);
} else {
String errorMessage = "VES event parsing.";
if (!missingValues.isEmpty()) {
errorMessage += " Missing data: " + missingValues;
}
- if (!isChangeIdentifierCorrect(changeIdentifier) || !isChangeTypeCorrect(changeType)) {
- errorMessage += " Change identifier or change type is wrong.";
+ if (!isChangeTypeCorrect(changeType)) {
+ errorMessage += " Change type is wrong: " + changeType + " expected: " + FILE_READY_CHANGE_TYPE;
}
errorMessage += " Message: {}";
logger.error(errorMessage, message);
@@ -198,15 +199,12 @@ public class JsonMessageParser {
}
}
- private boolean isChangeTypeCorrect(String changeType) {
+ private static boolean isChangeTypeCorrect(String changeType) {
return FILE_READY_CHANGE_TYPE.equals(changeType);
}
- private boolean isChangeIdentifierCorrect(String changeIdentifier) {
- return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier);
- }
-
- private List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields, MessageMetaData messageMetaData) {
+ private static List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields,
+ MessageMetaData messageMetaData) {
List<FileData> res = new ArrayList<>();
for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
@@ -219,7 +217,7 @@ public class JsonMessageParser {
return res;
}
- private Optional<FileData> getFileDataFromJson(JsonObject fileInfo, MessageMetaData messageMetaData) {
+ private static Optional<FileData> getFileDataFromJson(JsonObject fileInfo, MessageMetaData messageMetaData) {
logger.trace("starting to getFileDataFromJson!");
List<String> missingValues = new ArrayList<>();
@@ -250,15 +248,17 @@ public class JsonMessageParser {
}
/**
- * Gets data from the event name. Defined as: {DomainAbbreviation}_{productName}-{vendorName}_{Description},
- * example: Noti_RnNode-Ericsson_FileReady
+ * Gets data from the event name. Defined as:
+ * {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example:
+ * Noti_RnNode-Ericsson_FileReady
*
* @param dataType The type of data to get, {@link DmaapConsumerJsonParser.EventNameDataType}.
* @param eventName The event name to get the data from.
* @param missingValues List of missing values. The dataType will be added if missing.
* @return String of data from event name
*/
- private String getDataFromEventName(EventNameDataType dataType, String eventName, List<String> missingValues) {
+ private static String getDataFromEventName(EventNameDataType dataType, String eventName,
+ List<String> missingValues) {
String[] eventArray = eventName.split("_|-");
if (eventArray.length >= 4) {
return eventArray[dataType.index];
@@ -269,7 +269,7 @@ public class JsonMessageParser {
return "";
}
- private String getValueFromJson(JsonObject jsonObject, String jsonKey, List<String> missingValues) {
+ private static String getValueFromJson(JsonObject jsonObject, String jsonKey, List<String> missingValues) {
if (jsonObject.has(jsonKey)) {
return jsonObject.get(jsonKey).getAsString();
} else {
@@ -278,11 +278,11 @@ public class JsonMessageParser {
}
}
- private boolean containsNotificationFields(JsonObject jsonObject) {
+ private static boolean containsNotificationFields(JsonObject jsonObject) {
return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS);
}
- private Flux<FileReadyMessage> logErrorAndReturnEmptyMessageFlux(String errorMessage) {
+ private static Flux<FileReadyMessage> logErrorAndReturnEmptyMessageFlux(String errorMessage) {
logger.error(errorMessage);
return Flux.empty();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
index 2b3a0ef6..ff267815 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
@@ -67,7 +67,7 @@ public class PublishedFileCache {
return publishedFiles.size();
}
- private boolean isCachedPublishedFileOutdated(Instant now, Instant then) {
+ private static boolean isCachedPublishedFileOutdated(Instant now, Instant then) {
final int timeToKeepInfoInSeconds = 60 * 60 * 24;
return now.getEpochSecond() - then.getEpochSecond() > timeToKeepInfoInSeconds;
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java
index 8d433827..198c1bf1 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java
@@ -27,7 +27,9 @@ import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.Future;
+
import javax.net.ssl.SSLContext;
+
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
@@ -44,8 +46,6 @@ import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
-import org.springframework.web.util.DefaultUriBuilderFactory;
-import org.springframework.web.util.UriBuilder;
/**
* Client used to send requests to DataRouter.
@@ -139,22 +139,8 @@ public class DmaapProducerHttpClient {
request.addHeader("Authorization", "Basic " + base64Creds);
}
- /**
- * Gets a <code>UriBuilder</code> containing the base URI needed talk to DataRouter. Specific parts can then be
- * added to the URI by the user.
- *
- * @return a <code>UriBuilder</code> containing the base URI needed talk to DataRouter.
- */
- public UriBuilder getBaseUri() {
- return new DefaultUriBuilderFactory().builder() //
- .scheme(configuration.dmaapProtocol()) //
- .host(configuration.dmaapHostName()) //
- .port(configuration.dmaapPortNumber());
- }
-
private CloseableHttpAsyncClient createWebClient(boolean expectRedirect, Duration requestTimeout,
- Map<String, String> contextMap)
- throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
+ Map<String, String> contextMap) throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
SSLContext sslContext =
new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
index e50ef580..f1d33454 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
@@ -21,6 +21,7 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.service.DmaapWebClient;
import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
@@ -29,6 +30,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consume
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.reactive.function.client.WebClient;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -43,7 +45,7 @@ public class DMaaPMessageConsumer {
private final JsonMessageParser jsonMessageParser;
private final DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
- public DMaaPMessageConsumer(AppConfig datafileAppConfig) {
+ public DMaaPMessageConsumer(AppConfig datafileAppConfig) throws DatafileTaskException {
this.jsonMessageParser = new JsonMessageParser();
this.dmaaPConsumerReactiveHttpClient = createHttpClient(datafileAppConfig);
}
@@ -69,8 +71,9 @@ public class DMaaPMessageConsumer {
return jsonMessageParser.getMessagesFromJson(message);
}
- private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig) {
- DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration();
+ private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig)
+ throws DatafileTaskException {
+ DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration().toDmaap();
WebClient client = new DmaapWebClient().fromConfiguration(config).build();
return new DMaaPConsumerReactiveHttpClient(config, client);
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index e5dd01e9..1d6baa65 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -25,6 +25,7 @@ import com.google.gson.JsonParser;
import java.io.IOException;
import java.io.InputStream;
+import java.net.MalformedURLException;
import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
@@ -34,6 +35,8 @@ import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.JsonSerializer;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
@@ -46,6 +49,7 @@ import org.slf4j.MDC;
import org.springframework.core.io.FileSystemResource;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
+import org.springframework.web.util.DefaultUriBuilderFactory;
import reactor.core.publisher.Mono;
@@ -58,12 +62,9 @@ import reactor.core.publisher.Mono;
public class DataRouterPublisher {
private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
private static final String CONTENT_TYPE = "application/octet-stream";
- private static final String PUBLISH_TOPIC = "publish";
- private static final String DEFAULT_FEED_ID = "1";
private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class);
private final AppConfig datafileAppConfig;
- private DmaapProducerHttpClient dmaapProducerReactiveHttpClient;
public DataRouterPublisher(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
@@ -80,7 +81,6 @@ public class DataRouterPublisher {
public Mono<FilePublishInformation> publishFile(FilePublishInformation publishInfo, long numRetries,
Duration firstBackoff) {
MDC.setContextMap(publishInfo.getContext());
- dmaapProducerReactiveHttpClient = resolveClient();
return Mono.just(publishInfo) //
.cache() //
.flatMap(this::publishFile) //
@@ -92,13 +92,14 @@ public class DataRouterPublisher {
MDC.setContextMap(publishInfo.getContext());
logger.trace("Entering publishFile with {}", publishInfo);
try {
+ DmaapProducerHttpClient dmaapProducerHttpClient = resolveClient(publishInfo.getChangeIdentifier());
HttpPut put = new HttpPut();
prepareHead(publishInfo, put);
prepareBody(publishInfo, put);
- dmaapProducerReactiveHttpClient.addUserCredentialsToHead(put);
+ dmaapProducerHttpClient.addUserCredentialsToHead(put);
HttpResponse response =
- dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext());
+ dmaapProducerHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext());
logger.trace("{}", response);
return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
} catch (Exception e) {
@@ -107,11 +108,18 @@ public class DataRouterPublisher {
}
}
- private void prepareHead(FilePublishInformation publishInfo, HttpPut put) {
+ private void prepareHead(FilePublishInformation publishInfo, HttpPut put) throws DatafileTaskException {
+
put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE);
JsonElement metaData = new JsonParser().parse(JsonSerializer.createJsonBodyForDataRouter(publishInfo));
put.addHeader(X_DMAAP_DR_META, metaData.toString());
- put.setURI(getPublishUri(publishInfo.getName()));
+ URI uri = new DefaultUriBuilderFactory(
+ datafileAppConfig.getPublisherConfiguration(publishInfo.getChangeIdentifier()).publishUrl()) //
+ .builder() //
+ .pathSegment(publishInfo.getName()) //
+ .build();
+ put.setURI(uri);
+
MappedDiagnosticContext.appendTraceInfo(put);
}
@@ -122,14 +130,7 @@ public class DataRouterPublisher {
}
}
- private URI getPublishUri(String fileName) {
- return dmaapProducerReactiveHttpClient.getBaseUri() //
- .pathSegment(PUBLISH_TOPIC) //
- .pathSegment(DEFAULT_FEED_ID) //
- .pathSegment(fileName).build();
- }
-
- private Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation publishInfo) {
+ private static Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation publishInfo) {
MDC.setContextMap(publishInfo.getContext());
if (HttpUtils.isSuccessfulResponseCode(response.value())) {
logger.trace("Publish to DR successful!");
@@ -145,11 +146,17 @@ public class DataRouterPublisher {
return realResource.getInputStream();
}
- DmaapPublisherConfiguration resolveConfiguration() {
- return datafileAppConfig.getDmaapPublisherConfiguration();
+ PublisherConfiguration resolveConfiguration(String changeIdentifer) throws DatafileTaskException {
+ return datafileAppConfig.getPublisherConfiguration(changeIdentifer);
}
- DmaapProducerHttpClient resolveClient() {
- return new DmaapProducerHttpClient(resolveConfiguration());
+ DmaapProducerHttpClient resolveClient(String changeIdentifier) throws DatafileTaskException {
+ try {
+ DmaapPublisherConfiguration cfg = resolveConfiguration(changeIdentifier).toDmaap();
+ return new DmaapProducerHttpClient(cfg);
+ } catch (MalformedURLException e) {
+ throw new DatafileTaskException("Cannot resolve producer client", e);
+ }
+
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index 0c62795e..6ddcb541 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -74,12 +74,12 @@ public class FileCollector {
return Mono.just(fileData) //
.cache() //
- .flatMap(fd -> collectFile(fileData, contextMap)) //
+ .flatMap(fd -> tryCollectFile(fileData, contextMap)) //
.retryBackoff(numRetries, firstBackoff) //
- .flatMap(this::checkCollectedFile);
+ .flatMap(FileCollector::checkCollectedFile);
}
- private Mono<FilePublishInformation> checkCollectedFile(Optional<FilePublishInformation> info) {
+ private static Mono<FilePublishInformation> checkCollectedFile(Optional<FilePublishInformation> info) {
if (info.isPresent()) {
return Mono.just(info.get());
} else {
@@ -88,7 +88,7 @@ public class FileCollector {
}
}
- private Mono<Optional<FilePublishInformation>> collectFile(FileData fileData, Map<String, String> context) {
+ private Mono<Optional<FilePublishInformation>> tryCollectFile(FileData fileData, Map<String, String> context) {
MDC.setContextMap(context);
logger.trace("starting to collectFile {}", fileData.name());
@@ -110,7 +110,7 @@ public class FileCollector {
}
} catch (Exception throwable) {
logger.warn("Failed to close ftp client: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
- throwable.toString());
+ throwable.toString(), throwable);
return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context)));
}
}
@@ -126,7 +126,7 @@ public class FileCollector {
}
}
- private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile,
+ private static FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile,
Map<String, String> context) {
String location = fileData.location();
MessageMetaData metaData = fileData.messageMetaData();
@@ -143,6 +143,7 @@ public class FileCollector {
.compression(fileData.compression()) //
.fileFormatType(fileData.fileFormatType()) //
.fileFormatVersion(fileData.fileFormatVersion()) //
+ .changeIdentifier(fileData.messageMetaData().changeIdentifier()) //
.context(context) //
.build();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
index e18da248..4d8d679d 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
@@ -21,18 +21,23 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
import java.io.InputStream;
+import java.net.MalformedURLException;
import java.net.URI;
+import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Map;
+
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -44,12 +49,9 @@ import org.slf4j.MDC;
*
*/
public class PublishedChecker {
- private static final String FEEDLOG_TOPIC = "feedlog";
- private static final String DEFAULT_FEED_ID = "1";
- private static final Duration WEB_CLIENT_TIMEOUT = Duration.ofSeconds(4);
+ private static final Duration WEB_CLIENT_TIMEOUT = Duration.ofSeconds(4);
private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
private final AppConfig appConfig;
/**
@@ -66,19 +68,24 @@ public class PublishedChecker {
*
* @param fileName the name of the file used when it is published.
*
- * @return <code>true</code> if the file has been published before, <code>false</code> otherwise.
+ * @return <code>true</code> if the file has been published before, <code>false</code>
+ * otherwise.
+ * @throws DatafileTaskException if the check fails
*/
- public boolean isFilePublished(String fileName, Map<String, String> contextMap) {
+ public boolean isFilePublished(String fileName, String changeIdentifier, Map<String, String> contextMap)
+ throws DatafileTaskException {
MDC.setContextMap(contextMap);
- DmaapProducerHttpClient producerClient = resolveClient();
+ PublisherConfiguration publisherConfig = resolveConfiguration(changeIdentifier);
+
+ DmaapProducerHttpClient producerClient = resolveClient(publisherConfig);
HttpGet getRequest = new HttpGet();
MappedDiagnosticContext.appendTraceInfo(getRequest);
- getRequest.setURI(getPublishedQueryUri(fileName, producerClient));
- producerClient.addUserCredentialsToHead(getRequest);
-
try {
+ getRequest.setURI(getPublishedQueryUri(fileName, publisherConfig));
+ producerClient.addUserCredentialsToHead(getRequest);
+
HttpResponse response = producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest,
WEB_CLIENT_TIMEOUT, contextMap);
@@ -95,20 +102,23 @@ public class PublishedChecker {
}
}
- private URI getPublishedQueryUri(String fileName, DmaapProducerHttpClient producerClient) {
- return producerClient.getBaseUri() //
- .pathSegment(FEEDLOG_TOPIC) //
- .pathSegment(DEFAULT_FEED_ID) //
- .queryParam("type", "pub") //
- .queryParam("filename", fileName) //
+ private static URI getPublishedQueryUri(String fileName, PublisherConfiguration config) throws URISyntaxException {
+ return new URIBuilder(config.logUrl()) //
+ .addParameter("type", "pub") //
+ .addParameter("filename", fileName) //
.build();
}
- protected DmaapPublisherConfiguration resolveConfiguration() {
- return appConfig.getDmaapPublisherConfiguration();
+ protected PublisherConfiguration resolveConfiguration(String changeIdentifier) throws DatafileTaskException {
+ return appConfig.getPublisherConfiguration(changeIdentifier);
}
- protected DmaapProducerHttpClient resolveClient() {
- return new DmaapProducerHttpClient(resolveConfiguration());
+ protected DmaapProducerHttpClient resolveClient(PublisherConfiguration publisherConfig)
+ throws DatafileTaskException {
+ try {
+ return new DmaapProducerHttpClient(publisherConfig.toDmaap());
+ } catch (MalformedURLException e) {
+ throw new DatafileTaskException("Cannot create published checker client", e);
+ }
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index b5fa0c24..bac52659 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
@@ -86,13 +87,16 @@ public class ScheduledTasks {
threadPoolQueueSize.get());
return;
}
+ if (this.applicationConfiguration.getDmaapConsumerConfiguration() == null) {
+ logger.warn("No configuration loaded, skipping polling for messages");
+ return;
+ }
currentNumberOfSubscriptions.incrementAndGet();
Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
logger.trace("Execution of tasks was registered");
- applicationConfiguration.loadConfigurationFromFile();
createMainTask(context) //
- .subscribe(this::onSuccess, //
+ .subscribe(ScheduledTasks::onSuccess, //
throwable -> {
onError(throwable, context);
currentNumberOfSubscriptions.decrementAndGet();
@@ -115,6 +119,7 @@ public class ScheduledTasks {
.flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) //
.doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
.flatMap(fileData -> createMdcContext(fileData, context)) //
+ .filter(this::isFeedConfigured) //
.filter(this::shouldBePublished) //
.flatMap(this::fetchFile, false, 1, 1) //
.flatMap(this::publishToDataRouter, false, 1, 1) //
@@ -124,13 +129,13 @@ public class ScheduledTasks {
}
private class FileDataWithContext {
- FileDataWithContext(FileData fileData, Map<String, String> context) {
+ public final FileData fileData;
+ public final Map<String, String> context;
+
+ public FileDataWithContext(FileData fileData, Map<String, String> context) {
this.fileData = fileData;
this.context = context;
}
-
- final FileData fileData;
- final Map<String, String> context;
}
/**
@@ -160,7 +165,7 @@ public class ScheduledTasks {
return this.threadPoolQueueSize.get();
}
- protected DMaaPMessageConsumer createConsumerTask() {
+ protected DMaaPMessageConsumer createConsumerTask() throws DatafileTaskException {
return new DMaaPMessageConsumer(this.applicationConfiguration);
}
@@ -172,17 +177,17 @@ public class ScheduledTasks {
return new DataRouterPublisher(applicationConfiguration);
}
- private void onComplete(Map<String, String> contextMap) {
+ private static void onComplete(Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
logger.trace("Datafile tasks have been completed");
}
- private synchronized void onSuccess(FilePublishInformation publishInfo) {
+ private static synchronized void onSuccess(FilePublishInformation publishInfo) {
MDC.setContextMap(publishInfo.getContext());
logger.info("Datafile file published {}", publishInfo.getInternalLocation());
}
- private void onError(Throwable throwable, Map<String, String> context) {
+ private static void onError(Throwable throwable, Map<String, String> context) {
MDC.setContextMap(context);
logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable.toString());
}
@@ -194,11 +199,26 @@ public class ScheduledTasks {
return Mono.just(pair);
}
+ private boolean isFeedConfigured(FileDataWithContext fileData) {
+ if (applicationConfiguration.isFeedConfigured(fileData.fileData.messageMetaData().changeIdentifier())) {
+ return true;
+ } else {
+ logger.info("No feed is configured for: {}, file ignored: {}",
+ fileData.fileData.messageMetaData().changeIdentifier(), fileData.fileData.name());
+ return false;
+ }
+ }
+
private boolean shouldBePublished(FileDataWithContext fileData) {
boolean result = false;
Path localFilePath = fileData.fileData.getLocalFilePath();
if (publishedFilesCache.put(localFilePath) == null) {
- result = !createPublishedChecker().isFilePublished(fileData.fileData.name(), fileData.context);
+ try {
+ result = !createPublishedChecker().isFilePublished(fileData.fileData.name(),
+ fileData.fileData.messageMetaData().changeIdentifier(), fileData.context);
+ } catch (DatafileTaskException e) {
+ logger.error("Cannot check if a file {} is published", fileData.fileData.name(), e);
+ }
}
if (!result) {
currentNumberOfTasks.decrementAndGet();
@@ -248,13 +268,19 @@ public class ScheduledTasks {
*/
private Flux<FileReadyMessage> fetchMoreFileReadyMessages() {
logger.info(
- "Consuming new file ready messages, current number of tasks: {}, published files: {}, number of subscrptions: {}",
+ "Consuming new file ready messages, current number of tasks: {}, published files: {}, "
+ + "number of subscriptions: {}",
getCurrentNumberOfTasks(), publishedFilesCache.size(), this.currentNumberOfSubscriptions.get());
Map<String, String> context = MDC.getCopyOfContextMap();
- return createConsumerTask() //
- .getMessageRouterResponse() //
- .onErrorResume(exception -> handleConsumeMessageFailure(exception, context));
+ try {
+ return createConsumerTask() //
+ .getMessageRouterResponse() //
+ .onErrorResume(exception -> handleConsumeMessageFailure(exception, context));
+ } catch (Exception e) {
+ logger.error("Could not create message consumer task", e);
+ return Flux.empty();
+ }
}
private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> context) {
@@ -264,7 +290,7 @@ public class ScheduledTasks {
return Flux.empty();
}
- private void deleteFile(Path localFile, Map<String, String> context) {
+ private static void deleteFile(Path localFile, Map<String, String> context) {
MDC.setContextMap(context);
logger.trace("Deleting file: {}", localFile);
try {