summaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src
diff options
context:
space:
mode:
authorPatrikBuhr <patrik.buhr@est.tech>2019-05-29 09:39:53 +0000
committerPatrikBuhr <patrik.buhr@est.tech>2019-05-29 09:39:53 +0000
commit4229afc64d82cbd1ea1e43c92fcd6c9bed9e5137 (patch)
tree6c5c6efecb383a439f27cd163b20edfa364b6ce2 /datafile-app-server/src
parent2187fc6e6b4b4783389289fb87bd6a3676ad2c72 (diff)
Generalizing Data File Collection to handle any type of file
Extension of the DFC to be able to handle any file types types, which are published on different DR feeds. This association between file type and DR feed is defined by configuration. The file type is defined by the changeIdentifier in the fileReady VES message reported from the PNF. The creation of DR feeds and configuration will be done by the DMAAP plugin, but that is not tested yet. Change-Id: I13b36acd926a6941ee733e6b37922049fb54a5d9 Issue-ID: DCAEGEN2-1532 Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Diffstat (limited to 'datafile-app-server/src')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java211
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java141
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java110
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java105
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java5
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java74
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java24
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java3
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java8
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java15
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java46
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java20
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java9
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java47
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java13
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java52
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java58
-rw-r--r--datafile-app-server/src/main/resources/datafile_endpoints.json37
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java356
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java133
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfigTest.java38
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java63
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java72
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java6
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java10
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java2
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java33
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java4
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java62
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java61
-rw-r--r--datafile-app-server/src/test/resources/datafile_endpoints.json44
-rw-r--r--datafile-app-server/src/test/resources/datafile_endpoints_test.json31
-rw-r--r--datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json49
41 files changed, 1054 insertions, 914 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index a38eab8f..d324ca99 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -22,22 +22,38 @@ import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import com.google.gson.TypeAdapterFactory;
+
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Properties;
import java.util.ServiceLoader;
+
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.CloudConfigurationProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.ComponentScan;
import org.springframework.stereotype.Component;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
/**
* Holds all configuration for the DFC.
*
@@ -46,105 +62,174 @@ import org.springframework.stereotype.Component;
*/
@Component
+@ComponentScan("org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers")
@EnableConfigurationProperties
@ConfigurationProperties("app")
public class AppConfig {
-
- private static final String CONFIG = "configs";
- private static final String DMAAP = "dmaap";
- private static final String DMAAP_PRODUCER = "dmaapProducerConfiguration";
- private static final String DMAAP_CONSUMER = "dmaapConsumerConfiguration";
- private static final String FTP = "ftp";
- private static final String FTPES_CONFIGURATION = "ftpesConfiguration";
- private static final String SECURITY = "security";
private static final Logger logger = LoggerFactory.getLogger(AppConfig.class);
- private DmaapConsumerConfiguration dmaapConsumerConfiguration;
- private DmaapPublisherConfiguration dmaapPublisherConfiguration;
+ private ConsumerConfiguration dmaapConsumerConfiguration;
+ private Map<String, PublisherConfiguration> publishingConfiguration;
private FtpesConfig ftpesConfiguration;
+ private CloudConfigurationProvider cloudConfigurationProvider;
+ @Value("#{systemEnvironment}")
+ Properties systemEnvironment;
+ private Disposable refreshConfigTask = null;
@NotEmpty
private String filepath;
- public synchronized DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
+ @Autowired
+ public synchronized void setCloudConfigurationProvider(
+ CloudConfigurationProvider reactiveCloudConfigurationProvider) {
+ this.cloudConfigurationProvider = reactiveCloudConfigurationProvider;
+ }
+
+ public synchronized void setFilepath(String filepath) {
+ this.filepath = filepath;
+ }
+
+ /**
+ * Reads the cloud configuration.
+ */
+ public void initialize() {
+ stop();
+ Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
+ loadConfigurationFromFile();
+
+ refreshConfigTask = Flux.interval(Duration.ZERO, Duration.ofMinutes(5))
+ .flatMap(count -> createRefreshConfigurationTask(count, context))
+ .subscribe(e -> logger.info("Refreshed configuration data"),
+ throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
+ () -> logger.error("Configuration refresh terminated"));
+ }
+
+ public void stop() {
+ if (refreshConfigTask != null) {
+ refreshConfigTask.dispose();
+ refreshConfigTask = null;
+ }
+ }
+
+ public synchronized ConsumerConfiguration getDmaapConsumerConfiguration() {
return dmaapConsumerConfiguration;
}
- public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
- return dmaapPublisherConfiguration;
+ public synchronized boolean isFeedConfigured(String changeIdentifier) {
+ return publishingConfiguration.containsKey(changeIdentifier);
+ }
+
+ public synchronized PublisherConfiguration getPublisherConfiguration(String changeIdentifier)
+ throws DatafileTaskException {
+
+ if (publishingConfiguration == null) {
+ throw new DatafileTaskException("No PublishingConfiguration loaded, changeIdentifier: " + changeIdentifier);
+ }
+ PublisherConfiguration cfg = publishingConfiguration.get(changeIdentifier);
+ if (cfg == null) {
+ throw new DatafileTaskException(
+ "Cannot find getPublishingConfiguration for changeIdentifier: " + changeIdentifier);
+ }
+ return cfg;
}
public synchronized FtpesConfig getFtpesConfiguration() {
return ftpesConfiguration;
}
+ Flux<AppConfig> createRefreshConfigurationTask(Long counter, Map<String, String> context) {
+ return Flux.just(counter) //
+ .doOnNext(cnt -> logger.debug("Refresh config {}", cnt)) //
+ .flatMap(cnt -> readEnvironmentVariables(systemEnvironment, context)) //
+ .flatMap(this::fetchConfiguration);
+ }
+
+ Mono<EnvProperties> readEnvironmentVariables(Properties systemEnvironment, Map<String, String> context) {
+ return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context)
+ .onErrorResume(AppConfig::onErrorResume);
+ }
+
+ private static <R> Mono<R> onErrorResume(Throwable trowable) {
+ logger.error("Could not refresh application configuration {}", trowable.toString());
+ return Mono.empty();
+ }
+
+ private Mono<AppConfig> fetchConfiguration(EnvProperties env) {
+ Mono<JsonObject> serviceCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(env) //
+ .onErrorResume(AppConfig::onErrorResume);
+
+ // Note, have to use this callForServiceConfigurationReactive with EnvProperties, since the
+ // other ones does not work
+ EnvProperties dmaapEnv = ImmutableEnvProperties.builder() //
+ .consulHost(env.consulHost()) //
+ .consulPort(env.consulPort()) //
+ .cbsName(env.cbsName()) //
+ .appName(env.appName() + ":dmaap") //
+ .build(); //
+ Mono<JsonObject> dmaapCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(dmaapEnv)
+ .onErrorResume(t -> Mono.just(new JsonObject()));
+
+ return serviceCfg.zipWith(dmaapCfg, this::parseCloudConfig) //
+ .onErrorResume(AppConfig::onErrorResume);
+ }
+
+ /**
+ * parse configuration
+ *
+ * @param serviceConfigRootObject
+ * @param dmaapConfigRootObject if there is no dmaapConfigRootObject, the dmaap feeds are taken
+ * from the serviceConfigRootObject
+ * @return this which is updated if successful
+ */
+ private AppConfig parseCloudConfig(JsonObject serviceConfigRootObject, JsonObject dmaapConfigRootObject) {
+ try {
+ CloudConfigParser parser = new CloudConfigParser(serviceConfigRootObject, dmaapConfigRootObject);
+ setConfiguration(parser.getDmaapConsumerConfig(), parser.getDmaapPublisherConfig(),
+ parser.getFtpesConfig());
+ } catch (DatafileTaskException e) {
+ logger.error("Could not parse configuration {}", e.toString(), e);
+ }
+ return this;
+ }
+
/**
* Reads the configuration from file.
*/
- public void loadConfigurationFromFile() {
+ void loadConfigurationFromFile() {
GsonBuilder gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
- JsonParser parser = new JsonParser();
- JsonObject jsonObject;
+
try (InputStream inputStream = createInputStream(filepath)) {
- JsonElement rootElement = getJsonElement(parser, inputStream);
- if (rootElement.isJsonObject()) {
- jsonObject = rootElement.getAsJsonObject();
- FtpesConfig ftpesConfig = deserializeType(gsonBuilder,
- jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(FTP).getAsJsonObject(FTPES_CONFIGURATION),
- FtpesConfig.class);
- DmaapConsumerConfiguration consumerConfiguration = deserializeType(gsonBuilder,
- concatenateJsonObjects(
- jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP)
- .getAsJsonObject(DMAAP_CONSUMER),
- rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
- DmaapConsumerConfiguration.class);
-
- DmaapPublisherConfiguration publisherConfiguration = deserializeType(gsonBuilder,
- concatenateJsonObjects(
- jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP)
- .getAsJsonObject(DMAAP_PRODUCER),
- rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
- DmaapPublisherConfiguration.class);
-
- setConfiguration(consumerConfiguration, publisherConfiguration, ftpesConfig);
+ JsonParser parser = new JsonParser();
+ JsonObject rootObject = getJsonElement(parser, inputStream).getAsJsonObject();
+ if (rootObject == null) {
+ throw new JsonSyntaxException("Root is not a json object");
}
+ parseCloudConfig(rootObject, rootObject);
} catch (JsonSyntaxException | IOException e) {
- logger.error("Problem with loading configuration, file: {}", filepath, e);
+ logger.warn("Local configuration file not loaded: {}", filepath, e);
}
}
- synchronized void setConfiguration(DmaapConsumerConfiguration consumerConfiguration,
- DmaapPublisherConfiguration publisherConfiguration, FtpesConfig ftpesConfig) {
- this.dmaapConsumerConfiguration = consumerConfiguration;
- this.dmaapPublisherConfiguration = publisherConfiguration;
- this.ftpesConfiguration = ftpesConfig;
+ private synchronized void setConfiguration(ConsumerConfiguration consumerConfiguration,
+ Map<String, PublisherConfiguration> publisherConfiguration, FtpesConfig ftpesConfig) {
+ if (consumerConfiguration == null || publisherConfiguration == null || ftpesConfig == null) {
+ logger.error(
+ "Problem with configuration consumerConfiguration: {}, publisherConfiguration: {}, ftpesConfig: {}",
+ consumerConfiguration, publisherConfiguration, ftpesConfig);
+ } else {
+ this.dmaapConsumerConfiguration = consumerConfiguration;
+ this.publishingConfiguration = publisherConfiguration;
+ this.ftpesConfiguration = ftpesConfig;
+ }
}
JsonElement getJsonElement(JsonParser parser, InputStream inputStream) {
return parser.parse(new InputStreamReader(inputStream));
}
- private <T> T deserializeType(@NotNull GsonBuilder gsonBuilder, @NotNull JsonObject jsonObject,
- @NotNull Class<T> type) {
- return gsonBuilder.create().fromJson(jsonObject, type);
- }
-
InputStream createInputStream(@NotNull String filepath) throws IOException {
return new BufferedInputStream(new FileInputStream(filepath));
}
- synchronized String getFilepath() {
- return this.filepath;
- }
-
- public synchronized void setFilepath(String filepath) {
- this.filepath = filepath;
- }
-
- private JsonObject concatenateJsonObjects(JsonObject target, JsonObject source) {
- source.entrySet().forEach(entry -> target.add(entry.getKey(), entry.getValue()));
- return target;
- }
-
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
index 6b7860c4..3ac6b2c6 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
@@ -18,11 +18,17 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
/**
@@ -32,63 +38,106 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.Immutabl
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
public class CloudConfigParser {
-
private static final String DMAAP_SECURITY_TRUST_STORE_PATH = "dmaap.security.trustStorePath";
private static final String DMAAP_SECURITY_TRUST_STORE_PASS_PATH = "dmaap.security.trustStorePasswordPath";
private static final String DMAAP_SECURITY_KEY_STORE_PATH = "dmaap.security.keyStorePath";
private static final String DMAAP_SECURITY_KEY_STORE_PASS_PATH = "dmaap.security.keyStorePasswordPath";
private static final String DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH = "dmaap.security.enableDmaapCertAuth";
- private final JsonObject jsonObject;
+ private final JsonObject serviceConfigurationRoot;
+ private final JsonObject dmaapConfigurationRoot;
- CloudConfigParser(JsonObject jsonObject) {
- this.jsonObject = jsonObject;
+ public CloudConfigParser(JsonObject serviceConfigurationRoot, JsonObject dmaapConfigurationRoot) {
+ this.serviceConfigurationRoot = serviceConfigurationRoot;
+ this.dmaapConfigurationRoot = dmaapConfigurationRoot;
}
- DmaapPublisherConfiguration getDmaapPublisherConfig() {
- return new ImmutableDmaapPublisherConfiguration.Builder()
- .dmaapTopicName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapTopicName").getAsString())
- .dmaapUserPassword(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserPassword").getAsString())
- .dmaapPortNumber(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapPortNumber").getAsInt())
- .dmaapProtocol(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapProtocol").getAsString())
- .dmaapContentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString())
- .dmaapHostName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapHostName").getAsString())
- .dmaapUserName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserName").getAsString())
- .trustStorePath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PATH).getAsString())
- .trustStorePasswordPath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PASS_PATH).getAsString())
- .keyStorePath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PATH).getAsString())
- .keyStorePasswordPath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PASS_PATH).getAsString())
- .enableDmaapCertAuth(jsonObject.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
- .build();
+ public Map<String, PublisherConfiguration> getDmaapPublisherConfig() throws DatafileTaskException {
+ Iterator<JsonElement> producerCfgs =
+ toArray(serviceConfigurationRoot.get("dmaap.dmaapProducerConfiguration")).iterator();
+
+ Map<String, PublisherConfiguration> result = new HashMap<>();
+
+ while (producerCfgs.hasNext()) {
+ JsonObject producerCfg = producerCfgs.next().getAsJsonObject();
+ String feedName = getAsString(producerCfg, "feedName");
+ JsonObject feedConfig = getFeedConfig(feedName);
+
+ PublisherConfiguration cfg = ImmutablePublisherConfiguration.builder() //
+ .publishUrl(getAsString(feedConfig, "publish_url")) //
+ .passWord(getAsString(feedConfig, "password")) //
+ .userName(getAsString(feedConfig, "username")) //
+ .trustStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PATH)) //
+ .trustStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) //
+ .keyStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PATH)) //
+ .keyStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) //
+ .enableDmaapCertAuth(
+ get(serviceConfigurationRoot, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
+ .changeIdentifier(getAsString(producerCfg, "changeIdentifier")) //
+ .logUrl(getAsString(feedConfig, "log_url")) //
+ .build();
+
+ result.put(cfg.changeIdentifier(), cfg);
+ }
+ return result;
}
- DmaapConsumerConfiguration getDmaapConsumerConfig() {
- return new ImmutableDmaapConsumerConfiguration.Builder()
- .timeoutMs(jsonObject.get("dmaap.dmaapConsumerConfiguration.timeoutMs").getAsInt())
- .dmaapHostName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapHostName").getAsString())
- .dmaapUserName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserName").getAsString())
- .dmaapUserPassword(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserPassword").getAsString())
- .dmaapTopicName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapTopicName").getAsString())
- .dmaapPortNumber(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapPortNumber").getAsInt())
- .dmaapContentType(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapContentType").getAsString())
- .messageLimit(jsonObject.get("dmaap.dmaapConsumerConfiguration.messageLimit").getAsInt())
- .dmaapProtocol(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapProtocol").getAsString())
- .consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString())
- .consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString())
- .trustStorePath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PATH).getAsString())
- .trustStorePasswordPath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PASS_PATH).getAsString())
- .keyStorePath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PATH).getAsString())
- .keyStorePasswordPath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PASS_PATH).getAsString())
- .enableDmaapCertAuth(jsonObject.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
+ public ConsumerConfiguration getDmaapConsumerConfig() throws DatafileTaskException {
+ JsonObject consumerCfg = serviceConfigurationRoot.get("streams_subscribes").getAsJsonObject();
+ Set<Entry<String, JsonElement>> topics = consumerCfg.entrySet();
+ if (topics.size() != 1) {
+ throw new DatafileTaskException("Invalid configuration, number oftopic must be one, config: " + topics);
+ }
+ JsonObject topic = topics.iterator().next().getValue().getAsJsonObject();
+ JsonObject dmaapInfo = get(topic, "dmmap_info").getAsJsonObject();
+ String topicUrl = getAsString(dmaapInfo, "topic_url");
+
+ return ImmutableConsumerConfiguration.builder().topicUrl(topicUrl)
+ .trustStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PATH))
+ .trustStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PASS_PATH))
+ .keyStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PATH))
+ .keyStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PASS_PATH))
+ .enableDmaapCertAuth(
+ get(serviceConfigurationRoot, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
.build();
}
- FtpesConfig getFtpesConfig() {
+ public FtpesConfig getFtpesConfig() throws DatafileTaskException {
return new ImmutableFtpesConfig.Builder() //
- .keyCert(jsonObject.get("dmaap.ftpesConfig.keyCert").getAsString())
- .keyPassword(jsonObject.get("dmaap.ftpesConfig.keyPassword").getAsString())
- .trustedCa(jsonObject.get("dmaap.ftpesConfig.trustedCa").getAsString())
- .trustedCaPassword(jsonObject.get("dmaap.ftpesConfig.trustedCaPassword").getAsString()) //
+ .keyCert(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyCert"))
+ .keyPassword(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyPassword"))
+ .trustedCa(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.trustedCa"))
+ .trustedCaPassword(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.trustedCaPassword")) //
.build();
}
+
+ private static JsonElement get(JsonObject obj, String memberName) throws DatafileTaskException {
+ JsonElement elem = obj.get(memberName);
+ if (elem == null) {
+ throw new DatafileTaskException("Could not find member: " + memberName + " in: " + obj);
+ }
+ return elem;
+ }
+
+ private static String getAsString(JsonObject obj, String memberName) throws DatafileTaskException {
+ return get(obj, memberName).getAsString();
+ }
+
+ private JsonObject getFeedConfig(String feedName) throws DatafileTaskException {
+ JsonElement elem = dmaapConfigurationRoot.get(feedName);
+ if (elem == null) {
+ elem = get(serviceConfigurationRoot, feedName); // Fallback, try to find it under
+ // serviceConfigurationRoot
+ }
+ return elem.getAsJsonObject();
+ }
+
+ private static JsonArray toArray(JsonElement obj) {
+ if (obj.isJsonArray()) {
+ return obj.getAsJsonArray();
+ }
+ JsonArray arr = new JsonArray();
+ arr.add(obj);
+ return arr;
+ }
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
deleted file mode 100644
index 597f525f..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*-
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.configuration;
-
-import com.google.gson.JsonObject;
-import java.util.Map;
-import java.util.Properties;
-import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.ReactiveCloudConfigurationProvider;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.Primary;
-import org.springframework.scheduling.annotation.EnableScheduling;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
-
-/**
- * Gets the DFC configuration from the ConfigBindingService/Consul and parses it to the configurations needed in DFC.
- *
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-@Configuration
-@ComponentScan("org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers")
-@EnableConfigurationProperties
-@EnableScheduling
-@Primary
-public class CloudConfiguration extends AppConfig {
- private static final Logger logger = LoggerFactory.getLogger(CloudConfiguration.class);
- private ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider;
- private DmaapPublisherConfiguration dmaapPublisherCloudConfiguration;
- private DmaapConsumerConfiguration dmaapConsumerCloudConfiguration;
- private FtpesConfig ftpesCloudConfiguration;
-
- @Value("#{systemEnvironment}")
- private Properties systemEnvironment;
-
- @Autowired
- public synchronized void setThreadPoolTaskScheduler(
- ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider) {
- this.reactiveCloudConfigurationProvider = reactiveCloudConfigurationProvider;
- }
-
- /**
- * Reads the cloud configuration.
- */
- public void runTask() {
- Map<String,String> context = MappedDiagnosticContext.initializeTraceContext();
- EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context) //
- .subscribeOn(Schedulers.parallel()) //
- .flatMap(reactiveCloudConfigurationProvider::callForServiceConfigurationReactive) //
- .flatMap(this::parseCloudConfig) //
- .subscribe(null, this::onError, this::onComplete);
- }
-
- private void onComplete() {
- logger.trace("Configuration updated");
- }
-
- private void onError(Throwable throwable) {
- logger.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable);
- }
-
- private synchronized Mono<CloudConfiguration> parseCloudConfig(JsonObject jsonObject) {
- logger.info("Received application configuration: {}", jsonObject);
- CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject);
- dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig();
- dmaapConsumerCloudConfiguration = cloudConfigParser.getDmaapConsumerConfig();
- ftpesCloudConfiguration = cloudConfigParser.getFtpesConfig();
- return Mono.just(this);
- }
-
- @Override
- public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
- return dmaapPublisherCloudConfiguration != null ? dmaapPublisherCloudConfiguration
- : super.getDmaapPublisherConfiguration();
- }
-
- @Override
- public synchronized DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
- return dmaapConsumerCloudConfiguration != null ? dmaapConsumerCloudConfiguration
- : super.getDmaapConsumerConfiguration();
- }
-
- @Override
- public synchronized FtpesConfig getFtpesConfiguration() {
- return ftpesCloudConfiguration != null ? ftpesCloudConfiguration : super.getFtpesConfiguration();
- }
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java
new file mode 100644
index 00000000..fc9ab204
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java
@@ -0,0 +1,105 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018,2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.configuration;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
+
+@Value.Immutable
+@Value.Style(redactedMask = "####")
+@Gson.TypeAdapters
+public abstract class ConsumerConfiguration {
+ @Value.Redacted
+ public abstract String topicUrl();
+
+ public abstract String trustStorePath();
+
+ public abstract String trustStorePasswordPath();
+
+ public abstract String keyStorePath();
+
+ public abstract String keyStorePasswordPath();
+
+ public abstract Boolean enableDmaapCertAuth();
+
+ public DmaapConsumerConfiguration toDmaap() throws DatafileTaskException {
+ try {
+ URL url = new URL(topicUrl());
+ String passwd = "";
+ String userName = "";
+ if (url.getUserInfo() != null) {
+ String[] userInfo = url.getUserInfo().split(":");
+ userName = userInfo[0];
+ passwd = userInfo[1];
+ }
+ String urlPath = url.getPath();
+ DmaapConsumerUrlPath path = parseDmaapUrlPath(urlPath);
+
+ return new ImmutableDmaapConsumerConfiguration.Builder() //
+ .dmaapContentType("application/json") //
+ .dmaapPortNumber(url.getPort()) //
+ .dmaapHostName(url.getHost()) //
+ .dmaapTopicName(path.dmaapTopicName) //
+ .dmaapProtocol(url.getProtocol()) //
+ .dmaapUserName(userName) //
+ .dmaapUserPassword(passwd) //
+ .trustStorePath(this.trustStorePath()) //
+ .trustStorePasswordPath(this.trustStorePasswordPath()) //
+ .keyStorePath(this.keyStorePath()) //
+ .keyStorePasswordPath(this.keyStorePasswordPath()) //
+ .enableDmaapCertAuth(this.enableDmaapCertAuth()) //
+ .consumerId(path.consumerId) //
+ .consumerGroup(path.consumerGroup) //
+ .timeoutMs(-1) //
+ .messageLimit(-1) //
+ .build();
+ } catch (MalformedURLException e) {
+ throw new DatafileTaskException("Could not parse the URL", e);
+ }
+ }
+
+ private class DmaapConsumerUrlPath {
+ final String dmaapTopicName;
+ final String consumerGroup;
+ final String consumerId;
+
+ DmaapConsumerUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) {
+ this.dmaapTopicName = dmaapTopicName;
+ this.consumerGroup = consumerGroup;
+ this.consumerId = consumerId;
+ }
+ }
+
+ private DmaapConsumerUrlPath parseDmaapUrlPath(String urlPath) throws DatafileTaskException {
+ String[] tokens = urlPath.split("/"); // UrlPath: /events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12
+ if (tokens.length != 5) {
+ throw new DatafileTaskException("The path has incorrect syntax: " + urlPath);
+ }
+
+ final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // ex. // /events/unauthenticated.VES_NOTIFICATION_OUTPUT
+ final String consumerGroup = tokens[3]; // ex. OpenDcae-c12
+ final String consumerId = tokens[4]; // ex. C12
+ return new DmaapConsumerUrlPath(dmaapTopicName, consumerGroup, consumerId);
+ }
+
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
index 71003f80..62af92a8 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
@@ -19,12 +19,14 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+
import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
+
import reactor.core.publisher.Mono;
/**
@@ -53,7 +55,7 @@ class EnvironmentProcessor {
} catch (EnvironmentLoaderException e) {
return Mono.error(e);
}
- logger.info("Evaluated environment system variables {}", envProperties);
+ logger.trace("Evaluated environment system variables {}", envProperties);
return Mono.just(envProperties);
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java
index 3f029359..844699eb 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java
@@ -21,6 +21,7 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
import java.io.Serializable;
+
import org.immutables.gson.Gson;
import org.immutables.value.Value;
import org.springframework.stereotype.Component;
@@ -28,7 +29,7 @@ import org.springframework.stereotype.Component;
@Component
@Value.Immutable
-@Value.Style(builder = "new")
+@Value.Style(builder = "new", redactedMask = "####")
@Gson.TypeAdapters
public abstract class FtpesConfig implements Serializable {
@@ -38,11 +39,13 @@ public abstract class FtpesConfig implements Serializable {
public abstract String keyCert();
@Value.Parameter
+ @Value.Redacted
public abstract String keyPassword();
@Value.Parameter
public abstract String trustedCa();
@Value.Parameter
+ @Value.Redacted
public abstract String trustedCaPassword();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java
new file mode 100644
index 00000000..5576ed26
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java
@@ -0,0 +1,74 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018,2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.configuration;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+
+
+@Value.Immutable
+@Value.Style(redactedMask = "####")
+@Gson.TypeAdapters
+public interface PublisherConfiguration {
+
+ String publishUrl();
+
+ String logUrl();
+
+ String userName();
+
+ @Value.Redacted
+ String passWord();
+
+ String trustStorePath();
+
+ String trustStorePasswordPath();
+
+ String keyStorePath();
+
+ String keyStorePasswordPath();
+
+ Boolean enableDmaapCertAuth();
+
+ String changeIdentifier();
+
+ default DmaapPublisherConfiguration toDmaap() throws MalformedURLException {
+ URL url = new URL(publishUrl());
+ String urlPath = url.getPath();
+
+ return new ImmutableDmaapPublisherConfiguration.Builder() //
+ .dmaapContentType("application/octet-stream") //
+ .dmaapPortNumber(url.getPort()) //
+ .dmaapHostName(url.getHost()) //
+ .dmaapTopicName(urlPath) //
+ .dmaapProtocol(url.getProtocol()) //
+ .dmaapUserName(this.userName()) //
+ .dmaapUserPassword(this.passWord()) //
+ .trustStorePath(this.trustStorePath()) //
+ .trustStorePasswordPath(this.trustStorePasswordPath()) //
+ .keyStorePath(this.keyStorePath()) //
+ .keyStorePasswordPath(this.keyStorePasswordPath()) //
+ .enableDmaapCertAuth(this.enableDmaapCertAuth()) //
+ .build();
+ }
+
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
index b78e4ae5..5835de1a 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
@@ -16,7 +16,6 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
-import io.swagger.annotations.ApiOperation;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
@@ -24,7 +23,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
+
import javax.annotation.PostConstruct;
+
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
import org.slf4j.Logger;
@@ -36,6 +37,8 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
+
+import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
/**
@@ -49,7 +52,6 @@ import reactor.core.publisher.Mono;
public class SchedulerConfig {
private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = Duration.ofSeconds(15);
- private static final Duration SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = Duration.ofMinutes(5);
private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE = Duration.ofHours(1);
private static final Logger logger = LoggerFactory.getLogger(SchedulerConfig.class);
private static List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
@@ -57,21 +59,21 @@ public class SchedulerConfig {
private final TaskScheduler taskScheduler;
private final ScheduledTasks scheduledTask;
- private final CloudConfiguration cloudConfiguration;
+ private final AppConfig configuration;
/**
* Constructor.
*
* @param taskScheduler The scheduler used to schedule the tasks.
* @param scheduledTasks The scheduler that will actually handle the tasks.
- * @param cloudConfiguration The DFC configuration.
+ * @param configuration The DFC configuration.
*/
@Autowired
public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTasks,
- CloudConfiguration cloudConfiguration) {
+ AppConfig configuration) {
this.taskScheduler = taskScheduler;
this.scheduledTask = scheduledTasks;
- this.cloudConfiguration = cloudConfiguration;
+ this.configuration = configuration;
}
/**
@@ -83,6 +85,7 @@ public class SchedulerConfig {
public synchronized Mono<ResponseEntity<String>> getResponseFromCancellationOfTasks() {
scheduledFutureList.forEach(x -> x.cancel(false));
scheduledFutureList.clear();
+ configuration.stop();
MDC.setContextMap(contextMap);
logger.info("Stopped Datafile workflow");
MDC.clear();
@@ -99,12 +102,11 @@ public class SchedulerConfig {
public synchronized boolean tryToStartTask() {
contextMap = MappedDiagnosticContext.initializeTraceContext();
logger.info("Start scheduling Datafile workflow");
+ configuration.initialize();
+
if (scheduledFutureList.isEmpty()) {
- scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(cloudConfiguration::runTask,
- Instant.now(), SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY));
- scheduledFutureList.add(
- taskScheduler.scheduleWithFixedDelay(scheduledTask::executeDatafileMainTask,
- SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
+ scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(scheduledTask::executeDatafileMainTask,
+ SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
scheduledFutureList
.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()),
SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE));
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java
index 71242265..5a78261a 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java
@@ -62,7 +62,7 @@ public class SwaggerConfig extends WebMvcConfigurationSupport {
.build();
}
- private ApiInfo apiInfo() {
+ private static ApiInfo apiInfo() {
return new ApiInfoBuilder() //
.title(API_TITLE) //
.description(DESCRIPTION) //
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java
index cbd67297..847ca46e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java
@@ -41,7 +41,7 @@ public class TomcatHttpConfig {
return tomcat;
}
- private Connector getHttpConnector() {
+ private static Connector getHttpConnector() {
Connector connector = new Connector(TomcatServletWebServerFactory.DEFAULT_PROTOCOL);
connector.setScheme("http");
connector.setPort(8100);
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
index 4716fa87..791f0cf1 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
@@ -71,7 +71,7 @@ public class ScheduleController {
public Mono<ResponseEntity<String>> startTasks() {
return Mono.fromSupplier(schedulerConfig::tryToStartTask) //
- .map(this::createStartTaskResponse);
+ .map(ScheduleController::createStartTaskResponse);
}
/**
@@ -90,7 +90,7 @@ public class ScheduleController {
}
@ApiOperation(value = "Sends success or error response on starting task execution")
- private ResponseEntity<String> createStartTaskResponse(boolean wasScheduled) {
+ private static ResponseEntity<String> createStartTaskResponse(boolean wasScheduled) {
if (wasScheduled) {
return new ResponseEntity<>("Datafile Service has been started!", HttpStatus.CREATED);
} else {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
index 4c49dd8a..72623db2 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
@@ -17,6 +17,7 @@
package org.onap.dcaegen2.collectors.datafile.ftp;
import java.util.Optional;
+
import org.immutables.value.Value;
/**
@@ -26,11 +27,13 @@ import org.immutables.value.Value;
*
*/
@Value.Immutable
+@Value.Style(redactedMask = "####")
public interface FileServerData {
public String serverAddress();
public String userId();
+ @Value.Redacted
public String password();
public Optional<Integer> port();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
index c78ae3a3..bb3016ce 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
@@ -128,7 +128,7 @@ public class FtpsClient implements FileCollectClient {
logger.trace("collectFile fetched: {}", localFileName);
}
- private int getPort(Optional<Integer> port) {
+ private static int getPort(Optional<Integer> port) {
return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT;
}
@@ -180,7 +180,7 @@ public class FtpsClient implements FileCollectClient {
logger.warn("Local file {} already created", localFileName);
}
OutputStream output = new FileOutputStream(localFile);
- logger.debug("File {} opened xNF", localFileName);
+ logger.trace("File {} opened xNF", localFileName);
return output;
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
index 333be92a..bdaf6d4c 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
@@ -55,7 +55,7 @@ public class SftpClient implements FileCollectClient {
try {
sftpChannel.get(remoteFile, localFile.toString());
- logger.debug("File {} Download Successfull from xNF", localFile.getFileName());
+ logger.trace("File {} Download Successfull from xNF", localFile.getFileName());
} catch (SftpException e) {
boolean retry = e.id != ChannelSftp.SSH_FX_NO_SUCH_FILE && e.id != ChannelSftp.SSH_FX_PERMISSION_DENIED && e.id != ChannelSftp.SSH_FX_OP_UNSUPPORTED;
throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e, retry);
@@ -90,11 +90,11 @@ public class SftpClient implements FileCollectClient {
}
}
- private int getPort(Optional<Integer> port) {
+ private static int getPort(Optional<Integer> port) {
return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT;
}
- private Session setUpSession(FileServerData fileServerData) throws JSchException {
+ private static Session setUpSession(FileServerData fileServerData) throws JSchException {
JSch jsch = new JSch();
Session newSession =
@@ -105,7 +105,7 @@ public class SftpClient implements FileCollectClient {
return newSession;
}
- private ChannelSftp getChannel(Session session) throws JSchException {
+ private static ChannelSftp getChannel(Session session) throws JSchException {
Channel channel = session.openChannel("sftp");
channel.connect();
return (ChannelSftp) channel;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
index 0a6b669c..36aae949 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
@@ -38,6 +38,7 @@ import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
*/
@Value.Immutable
@Gson.TypeAdapters
+@Value.Style(redactedMask = "####")
public abstract class FileData {
public static final String DATAFILE_TMPDIR = "/tmp/onap_datafile/";
@@ -53,6 +54,7 @@ public abstract class FileData {
*
* @return the URL to use to fetch the file from the PNF
*/
+ @Value.Redacted
public abstract String location();
/**
@@ -123,7 +125,7 @@ public abstract class FileData {
* @return An <code>Optional</code> containing a String array with the user name and password if given, or an empty
* <code>Optional</code> if not given.
*/
- private Optional<String[]> getUserNameAndPasswordIfGiven(String userInfoString) {
+ private static Optional<String[]> getUserNameAndPasswordIfGiven(String userInfoString) {
if (userInfoString != null) {
String[] userAndPassword = userInfoString.split(":");
if (userAndPassword.length == 2) {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java
index 63ed0daa..5b8c015e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java
@@ -33,6 +33,7 @@ import org.immutables.value.Value;
@Value.Immutable
@Gson.TypeAdapters
+@Value.Style(redactedMask = "####")
public interface FilePublishInformation {
@SerializedName("productName")
@@ -54,6 +55,7 @@ public interface FilePublishInformation {
String getTimeZoneOffset();
@SerializedName("location")
+ @Value.Redacted
String getLocation();
@SerializedName("compression")
@@ -70,4 +72,6 @@ public interface FilePublishInformation {
String getName();
Map<String, String> getContext();
+
+ String getChangeIdentifier();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java
index 7081d1ac..b8df125b 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java
@@ -31,12 +31,12 @@ import java.util.Set;
*/
public abstract class JsonSerializer {
+ private JsonSerializer() {}
- private static Gson gson =
- new GsonBuilder() //
- .serializeNulls() //
- .addSerializationExclusionStrategy(new FilePublishInformationExclusionStrategy()) //
- .create(); //
+ private static Gson gson = new GsonBuilder() //
+ .serializeNulls() //
+ .addSerializationExclusionStrategy(new FilePublishInformationExclusionStrategy()) //
+ .create(); //
/**
* Serializes a <code>filePublishInformation</code>.
@@ -56,9 +56,10 @@ public abstract class JsonSerializer {
private final Set<String> inclusions =
Sets.newHashSet("productName", "vendorName", "lastEpochMicrosec", "sourceName", "startEpochMicrosec",
"timeZoneOffset", "location", "compression", "fileFormatType", "fileFormatVersion");
+
@Override
- public boolean shouldSkipField(FieldAttributes f) {
- return !inclusions.contains(f.getName());
+ public boolean shouldSkipField(FieldAttributes fieldAttributes) {
+ return !inclusions.contains(fieldAttributes.getName());
}
@Override
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index 3a3eb3aa..470c4e73 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -22,11 +22,13 @@ import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
+
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.StreamSupport;
+
import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
@@ -37,6 +39,7 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -70,7 +73,6 @@ public class JsonMessageParser {
private static final String FILE_FORMAT_VERSION = "fileFormatVersion";
private static final String FILE_READY_CHANGE_TYPE = "FileReady";
- private static final String FILE_READY_CHANGE_IDENTIFIER = "PM_MEAS_FILES";
/**
* The data types available in the event name.
@@ -92,7 +94,7 @@ public class JsonMessageParser {
* @return a <code>Flux</code> containing messages.
*/
public Flux<FileReadyMessage> getMessagesFromJson(Mono<String> rawMessage) {
- return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createMessageData);
+ return rawMessage.flatMapMany(JsonMessageParser::getJsonParserMessage).flatMap(this::createMessageData);
}
Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
@@ -123,18 +125,17 @@ public class JsonMessageParser {
: getMessagesFromJsonArray(jsonElement);
}
- private Mono<JsonElement> getJsonParserMessage(String message) {
- logger.trace("original message from message router: {}", message);
+ private static Mono<JsonElement> getJsonParserMessage(String message) {
return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
}
- private Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) {
+ private static Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) {
return jsonObject.flatMap(monoJsonP -> containsNotificationFields(monoJsonP) ? transformMessages(monoJsonP)
: logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject));
}
- private Mono<FileReadyMessage> transformMessages(JsonObject message) {
+ private static Mono<FileReadyMessage> transformMessages(JsonObject message) {
Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message);
if (optionalMessageMetaData.isPresent()) {
MessageMetaData messageMetaData = optionalMessageMetaData.get();
@@ -159,7 +160,7 @@ public class JsonMessageParser {
}
- private Optional<MessageMetaData> getMessageMetaData(JsonObject message) {
+ private static Optional<MessageMetaData> getMessageMetaData(JsonObject message) {
List<String> missingValues = new ArrayList<>();
JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER);
String eventName = getValueFromJson(commonEventHeader, EVENT_NAME, missingValues);
@@ -182,15 +183,15 @@ public class JsonMessageParser {
.changeIdentifier(changeIdentifier) //
.changeType(changeType) //
.build();
- if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) {
+ if (missingValues.isEmpty() && isChangeTypeCorrect(changeType)) {
return Optional.of(messageMetaData);
} else {
String errorMessage = "VES event parsing.";
if (!missingValues.isEmpty()) {
errorMessage += " Missing data: " + missingValues;
}
- if (!isChangeIdentifierCorrect(changeIdentifier) || !isChangeTypeCorrect(changeType)) {
- errorMessage += " Change identifier or change type is wrong.";
+ if (!isChangeTypeCorrect(changeType)) {
+ errorMessage += " Change type is wrong: " + changeType + " expected: " + FILE_READY_CHANGE_TYPE;
}
errorMessage += " Message: {}";
logger.error(errorMessage, message);
@@ -198,15 +199,12 @@ public class JsonMessageParser {
}
}
- private boolean isChangeTypeCorrect(String changeType) {
+ private static boolean isChangeTypeCorrect(String changeType) {
return FILE_READY_CHANGE_TYPE.equals(changeType);
}
- private boolean isChangeIdentifierCorrect(String changeIdentifier) {
- return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier);
- }
-
- private List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields, MessageMetaData messageMetaData) {
+ private static List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields,
+ MessageMetaData messageMetaData) {
List<FileData> res = new ArrayList<>();
for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
@@ -219,7 +217,7 @@ public class JsonMessageParser {
return res;
}
- private Optional<FileData> getFileDataFromJson(JsonObject fileInfo, MessageMetaData messageMetaData) {
+ private static Optional<FileData> getFileDataFromJson(JsonObject fileInfo, MessageMetaData messageMetaData) {
logger.trace("starting to getFileDataFromJson!");
List<String> missingValues = new ArrayList<>();
@@ -250,15 +248,17 @@ public class JsonMessageParser {
}
/**
- * Gets data from the event name. Defined as: {DomainAbbreviation}_{productName}-{vendorName}_{Description},
- * example: Noti_RnNode-Ericsson_FileReady
+ * Gets data from the event name. Defined as:
+ * {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example:
+ * Noti_RnNode-Ericsson_FileReady
*
* @param dataType The type of data to get, {@link DmaapConsumerJsonParser.EventNameDataType}.
* @param eventName The event name to get the data from.
* @param missingValues List of missing values. The dataType will be added if missing.
* @return String of data from event name
*/
- private String getDataFromEventName(EventNameDataType dataType, String eventName, List<String> missingValues) {
+ private static String getDataFromEventName(EventNameDataType dataType, String eventName,
+ List<String> missingValues) {
String[] eventArray = eventName.split("_|-");
if (eventArray.length >= 4) {
return eventArray[dataType.index];
@@ -269,7 +269,7 @@ public class JsonMessageParser {
return "";
}
- private String getValueFromJson(JsonObject jsonObject, String jsonKey, List<String> missingValues) {
+ private static String getValueFromJson(JsonObject jsonObject, String jsonKey, List<String> missingValues) {
if (jsonObject.has(jsonKey)) {
return jsonObject.get(jsonKey).getAsString();
} else {
@@ -278,11 +278,11 @@ public class JsonMessageParser {
}
}
- private boolean containsNotificationFields(JsonObject jsonObject) {
+ private static boolean containsNotificationFields(JsonObject jsonObject) {
return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS);
}
- private Flux<FileReadyMessage> logErrorAndReturnEmptyMessageFlux(String errorMessage) {
+ private static Flux<FileReadyMessage> logErrorAndReturnEmptyMessageFlux(String errorMessage) {
logger.error(errorMessage);
return Flux.empty();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
index 2b3a0ef6..ff267815 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
@@ -67,7 +67,7 @@ public class PublishedFileCache {
return publishedFiles.size();
}
- private boolean isCachedPublishedFileOutdated(Instant now, Instant then) {
+ private static boolean isCachedPublishedFileOutdated(Instant now, Instant then) {
final int timeToKeepInfoInSeconds = 60 * 60 * 24;
return now.getEpochSecond() - then.getEpochSecond() > timeToKeepInfoInSeconds;
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java
index 8d433827..198c1bf1 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java
@@ -27,7 +27,9 @@ import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.Future;
+
import javax.net.ssl.SSLContext;
+
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
@@ -44,8 +46,6 @@ import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
-import org.springframework.web.util.DefaultUriBuilderFactory;
-import org.springframework.web.util.UriBuilder;
/**
* Client used to send requests to DataRouter.
@@ -139,22 +139,8 @@ public class DmaapProducerHttpClient {
request.addHeader("Authorization", "Basic " + base64Creds);
}
- /**
- * Gets a <code>UriBuilder</code> containing the base URI needed talk to DataRouter. Specific parts can then be
- * added to the URI by the user.
- *
- * @return a <code>UriBuilder</code> containing the base URI needed talk to DataRouter.
- */
- public UriBuilder getBaseUri() {
- return new DefaultUriBuilderFactory().builder() //
- .scheme(configuration.dmaapProtocol()) //
- .host(configuration.dmaapHostName()) //
- .port(configuration.dmaapPortNumber());
- }
-
private CloseableHttpAsyncClient createWebClient(boolean expectRedirect, Duration requestTimeout,
- Map<String, String> contextMap)
- throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
+ Map<String, String> contextMap) throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
SSLContext sslContext =
new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
index e50ef580..f1d33454 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
@@ -21,6 +21,7 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.service.DmaapWebClient;
import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
@@ -29,6 +30,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consume
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.reactive.function.client.WebClient;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -43,7 +45,7 @@ public class DMaaPMessageConsumer {
private final JsonMessageParser jsonMessageParser;
private final DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
- public DMaaPMessageConsumer(AppConfig datafileAppConfig) {
+ public DMaaPMessageConsumer(AppConfig datafileAppConfig) throws DatafileTaskException {
this.jsonMessageParser = new JsonMessageParser();
this.dmaaPConsumerReactiveHttpClient = createHttpClient(datafileAppConfig);
}
@@ -69,8 +71,9 @@ public class DMaaPMessageConsumer {
return jsonMessageParser.getMessagesFromJson(message);
}
- private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig) {
- DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration();
+ private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig)
+ throws DatafileTaskException {
+ DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration().toDmaap();
WebClient client = new DmaapWebClient().fromConfiguration(config).build();
return new DMaaPConsumerReactiveHttpClient(config, client);
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index e5dd01e9..1d6baa65 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -25,6 +25,7 @@ import com.google.gson.JsonParser;
import java.io.IOException;
import java.io.InputStream;
+import java.net.MalformedURLException;
import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
@@ -34,6 +35,8 @@ import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.JsonSerializer;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
@@ -46,6 +49,7 @@ import org.slf4j.MDC;
import org.springframework.core.io.FileSystemResource;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
+import org.springframework.web.util.DefaultUriBuilderFactory;
import reactor.core.publisher.Mono;
@@ -58,12 +62,9 @@ import reactor.core.publisher.Mono;
public class DataRouterPublisher {
private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
private static final String CONTENT_TYPE = "application/octet-stream";
- private static final String PUBLISH_TOPIC = "publish";
- private static final String DEFAULT_FEED_ID = "1";
private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class);
private final AppConfig datafileAppConfig;
- private DmaapProducerHttpClient dmaapProducerReactiveHttpClient;
public DataRouterPublisher(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
@@ -80,7 +81,6 @@ public class DataRouterPublisher {
public Mono<FilePublishInformation> publishFile(FilePublishInformation publishInfo, long numRetries,
Duration firstBackoff) {
MDC.setContextMap(publishInfo.getContext());
- dmaapProducerReactiveHttpClient = resolveClient();
return Mono.just(publishInfo) //
.cache() //
.flatMap(this::publishFile) //
@@ -92,13 +92,14 @@ public class DataRouterPublisher {
MDC.setContextMap(publishInfo.getContext());
logger.trace("Entering publishFile with {}", publishInfo);
try {
+ DmaapProducerHttpClient dmaapProducerHttpClient = resolveClient(publishInfo.getChangeIdentifier());
HttpPut put = new HttpPut();
prepareHead(publishInfo, put);
prepareBody(publishInfo, put);
- dmaapProducerReactiveHttpClient.addUserCredentialsToHead(put);
+ dmaapProducerHttpClient.addUserCredentialsToHead(put);
HttpResponse response =
- dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext());
+ dmaapProducerHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext());
logger.trace("{}", response);
return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
} catch (Exception e) {
@@ -107,11 +108,18 @@ public class DataRouterPublisher {
}
}
- private void prepareHead(FilePublishInformation publishInfo, HttpPut put) {
+ private void prepareHead(FilePublishInformation publishInfo, HttpPut put) throws DatafileTaskException {
+
put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE);
JsonElement metaData = new JsonParser().parse(JsonSerializer.createJsonBodyForDataRouter(publishInfo));
put.addHeader(X_DMAAP_DR_META, metaData.toString());
- put.setURI(getPublishUri(publishInfo.getName()));
+ URI uri = new DefaultUriBuilderFactory(
+ datafileAppConfig.getPublisherConfiguration(publishInfo.getChangeIdentifier()).publishUrl()) //
+ .builder() //
+ .pathSegment(publishInfo.getName()) //
+ .build();
+ put.setURI(uri);
+
MappedDiagnosticContext.appendTraceInfo(put);
}
@@ -122,14 +130,7 @@ public class DataRouterPublisher {
}
}
- private URI getPublishUri(String fileName) {
- return dmaapProducerReactiveHttpClient.getBaseUri() //
- .pathSegment(PUBLISH_TOPIC) //
- .pathSegment(DEFAULT_FEED_ID) //
- .pathSegment(fileName).build();
- }
-
- private Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation publishInfo) {
+ private static Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation publishInfo) {
MDC.setContextMap(publishInfo.getContext());
if (HttpUtils.isSuccessfulResponseCode(response.value())) {
logger.trace("Publish to DR successful!");
@@ -145,11 +146,17 @@ public class DataRouterPublisher {
return realResource.getInputStream();
}
- DmaapPublisherConfiguration resolveConfiguration() {
- return datafileAppConfig.getDmaapPublisherConfiguration();
+ PublisherConfiguration resolveConfiguration(String changeIdentifer) throws DatafileTaskException {
+ return datafileAppConfig.getPublisherConfiguration(changeIdentifer);
}
- DmaapProducerHttpClient resolveClient() {
- return new DmaapProducerHttpClient(resolveConfiguration());
+ DmaapProducerHttpClient resolveClient(String changeIdentifier) throws DatafileTaskException {
+ try {
+ DmaapPublisherConfiguration cfg = resolveConfiguration(changeIdentifier).toDmaap();
+ return new DmaapProducerHttpClient(cfg);
+ } catch (MalformedURLException e) {
+ throw new DatafileTaskException("Cannot resolve producer client", e);
+ }
+
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index 0c62795e..6ddcb541 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -74,12 +74,12 @@ public class FileCollector {
return Mono.just(fileData) //
.cache() //
- .flatMap(fd -> collectFile(fileData, contextMap)) //
+ .flatMap(fd -> tryCollectFile(fileData, contextMap)) //
.retryBackoff(numRetries, firstBackoff) //
- .flatMap(this::checkCollectedFile);
+ .flatMap(FileCollector::checkCollectedFile);
}
- private Mono<FilePublishInformation> checkCollectedFile(Optional<FilePublishInformation> info) {
+ private static Mono<FilePublishInformation> checkCollectedFile(Optional<FilePublishInformation> info) {
if (info.isPresent()) {
return Mono.just(info.get());
} else {
@@ -88,7 +88,7 @@ public class FileCollector {
}
}
- private Mono<Optional<FilePublishInformation>> collectFile(FileData fileData, Map<String, String> context) {
+ private Mono<Optional<FilePublishInformation>> tryCollectFile(FileData fileData, Map<String, String> context) {
MDC.setContextMap(context);
logger.trace("starting to collectFile {}", fileData.name());
@@ -110,7 +110,7 @@ public class FileCollector {
}
} catch (Exception throwable) {
logger.warn("Failed to close ftp client: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
- throwable.toString());
+ throwable.toString(), throwable);
return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context)));
}
}
@@ -126,7 +126,7 @@ public class FileCollector {
}
}
- private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile,
+ private static FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile,
Map<String, String> context) {
String location = fileData.location();
MessageMetaData metaData = fileData.messageMetaData();
@@ -143,6 +143,7 @@ public class FileCollector {
.compression(fileData.compression()) //
.fileFormatType(fileData.fileFormatType()) //
.fileFormatVersion(fileData.fileFormatVersion()) //
+ .changeIdentifier(fileData.messageMetaData().changeIdentifier()) //
.context(context) //
.build();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
index e18da248..4d8d679d 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
@@ -21,18 +21,23 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
import java.io.InputStream;
+import java.net.MalformedURLException;
import java.net.URI;
+import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Map;
+
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -44,12 +49,9 @@ import org.slf4j.MDC;
*
*/
public class PublishedChecker {
- private static final String FEEDLOG_TOPIC = "feedlog";
- private static final String DEFAULT_FEED_ID = "1";
- private static final Duration WEB_CLIENT_TIMEOUT = Duration.ofSeconds(4);
+ private static final Duration WEB_CLIENT_TIMEOUT = Duration.ofSeconds(4);
private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
private final AppConfig appConfig;
/**
@@ -66,19 +68,24 @@ public class PublishedChecker {
*
* @param fileName the name of the file used when it is published.
*
- * @return <code>true</code> if the file has been published before, <code>false</code> otherwise.
+ * @return <code>true</code> if the file has been published before, <code>false</code>
+ * otherwise.
+ * @throws DatafileTaskException if the check fails
*/
- public boolean isFilePublished(String fileName, Map<String, String> contextMap) {
+ public boolean isFilePublished(String fileName, String changeIdentifier, Map<String, String> contextMap)
+ throws DatafileTaskException {
MDC.setContextMap(contextMap);
- DmaapProducerHttpClient producerClient = resolveClient();
+ PublisherConfiguration publisherConfig = resolveConfiguration(changeIdentifier);
+
+ DmaapProducerHttpClient producerClient = resolveClient(publisherConfig);
HttpGet getRequest = new HttpGet();
MappedDiagnosticContext.appendTraceInfo(getRequest);
- getRequest.setURI(getPublishedQueryUri(fileName, producerClient));
- producerClient.addUserCredentialsToHead(getRequest);
-
try {
+ getRequest.setURI(getPublishedQueryUri(fileName, publisherConfig));
+ producerClient.addUserCredentialsToHead(getRequest);
+
HttpResponse response = producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest,
WEB_CLIENT_TIMEOUT, contextMap);
@@ -95,20 +102,23 @@ public class PublishedChecker {
}
}
- private URI getPublishedQueryUri(String fileName, DmaapProducerHttpClient producerClient) {
- return producerClient.getBaseUri() //
- .pathSegment(FEEDLOG_TOPIC) //
- .pathSegment(DEFAULT_FEED_ID) //
- .queryParam("type", "pub") //
- .queryParam("filename", fileName) //
+ private static URI getPublishedQueryUri(String fileName, PublisherConfiguration config) throws URISyntaxException {
+ return new URIBuilder(config.logUrl()) //
+ .addParameter("type", "pub") //
+ .addParameter("filename", fileName) //
.build();
}
- protected DmaapPublisherConfiguration resolveConfiguration() {
- return appConfig.getDmaapPublisherConfiguration();
+ protected PublisherConfiguration resolveConfiguration(String changeIdentifier) throws DatafileTaskException {
+ return appConfig.getPublisherConfiguration(changeIdentifier);
}
- protected DmaapProducerHttpClient resolveClient() {
- return new DmaapProducerHttpClient(resolveConfiguration());
+ protected DmaapProducerHttpClient resolveClient(PublisherConfiguration publisherConfig)
+ throws DatafileTaskException {
+ try {
+ return new DmaapProducerHttpClient(publisherConfig.toDmaap());
+ } catch (MalformedURLException e) {
+ throw new DatafileTaskException("Cannot create published checker client", e);
+ }
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index b5fa0c24..bac52659 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
@@ -86,13 +87,16 @@ public class ScheduledTasks {
threadPoolQueueSize.get());
return;
}
+ if (this.applicationConfiguration.getDmaapConsumerConfiguration() == null) {
+ logger.warn("No configuration loaded, skipping polling for messages");
+ return;
+ }
currentNumberOfSubscriptions.incrementAndGet();
Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
logger.trace("Execution of tasks was registered");
- applicationConfiguration.loadConfigurationFromFile();
createMainTask(context) //
- .subscribe(this::onSuccess, //
+ .subscribe(ScheduledTasks::onSuccess, //
throwable -> {
onError(throwable, context);
currentNumberOfSubscriptions.decrementAndGet();
@@ -115,6 +119,7 @@ public class ScheduledTasks {
.flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) //
.doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
.flatMap(fileData -> createMdcContext(fileData, context)) //
+ .filter(this::isFeedConfigured) //
.filter(this::shouldBePublished) //
.flatMap(this::fetchFile, false, 1, 1) //
.flatMap(this::publishToDataRouter, false, 1, 1) //
@@ -124,13 +129,13 @@ public class ScheduledTasks {
}
private class FileDataWithContext {
- FileDataWithContext(FileData fileData, Map<String, String> context) {
+ public final FileData fileData;
+ public final Map<String, String> context;
+
+ public FileDataWithContext(FileData fileData, Map<String, String> context) {
this.fileData = fileData;
this.context = context;
}
-
- final FileData fileData;
- final Map<String, String> context;
}
/**
@@ -160,7 +165,7 @@ public class ScheduledTasks {
return this.threadPoolQueueSize.get();
}
- protected DMaaPMessageConsumer createConsumerTask() {
+ protected DMaaPMessageConsumer createConsumerTask() throws DatafileTaskException {
return new DMaaPMessageConsumer(this.applicationConfiguration);
}
@@ -172,17 +177,17 @@ public class ScheduledTasks {
return new DataRouterPublisher(applicationConfiguration);
}
- private void onComplete(Map<String, String> contextMap) {
+ private static void onComplete(Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
logger.trace("Datafile tasks have been completed");
}
- private synchronized void onSuccess(FilePublishInformation publishInfo) {
+ private static synchronized void onSuccess(FilePublishInformation publishInfo) {
MDC.setContextMap(publishInfo.getContext());
logger.info("Datafile file published {}", publishInfo.getInternalLocation());
}
- private void onError(Throwable throwable, Map<String, String> context) {
+ private static void onError(Throwable throwable, Map<String, String> context) {
MDC.setContextMap(context);
logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable.toString());
}
@@ -194,11 +199,26 @@ public class ScheduledTasks {
return Mono.just(pair);
}
+ private boolean isFeedConfigured(FileDataWithContext fileData) {
+ if (applicationConfiguration.isFeedConfigured(fileData.fileData.messageMetaData().changeIdentifier())) {
+ return true;
+ } else {
+ logger.info("No feed is configured for: {}, file ignored: {}",
+ fileData.fileData.messageMetaData().changeIdentifier(), fileData.fileData.name());
+ return false;
+ }
+ }
+
private boolean shouldBePublished(FileDataWithContext fileData) {
boolean result = false;
Path localFilePath = fileData.fileData.getLocalFilePath();
if (publishedFilesCache.put(localFilePath) == null) {
- result = !createPublishedChecker().isFilePublished(fileData.fileData.name(), fileData.context);
+ try {
+ result = !createPublishedChecker().isFilePublished(fileData.fileData.name(),
+ fileData.fileData.messageMetaData().changeIdentifier(), fileData.context);
+ } catch (DatafileTaskException e) {
+ logger.error("Cannot check if a file {} is published", fileData.fileData.name(), e);
+ }
}
if (!result) {
currentNumberOfTasks.decrementAndGet();
@@ -248,13 +268,19 @@ public class ScheduledTasks {
*/
private Flux<FileReadyMessage> fetchMoreFileReadyMessages() {
logger.info(
- "Consuming new file ready messages, current number of tasks: {}, published files: {}, number of subscrptions: {}",
+ "Consuming new file ready messages, current number of tasks: {}, published files: {}, "
+ + "number of subscriptions: {}",
getCurrentNumberOfTasks(), publishedFilesCache.size(), this.currentNumberOfSubscriptions.get());
Map<String, String> context = MDC.getCopyOfContextMap();
- return createConsumerTask() //
- .getMessageRouterResponse() //
- .onErrorResume(exception -> handleConsumeMessageFailure(exception, context));
+ try {
+ return createConsumerTask() //
+ .getMessageRouterResponse() //
+ .onErrorResume(exception -> handleConsumeMessageFailure(exception, context));
+ } catch (Exception e) {
+ logger.error("Could not create message consumer task", e);
+ return Flux.empty();
+ }
}
private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> context) {
@@ -264,7 +290,7 @@ public class ScheduledTasks {
return Flux.empty();
}
- private void deleteFile(Path localFile, Map<String, String> context) {
+ private static void deleteFile(Path localFile, Map<String, String> context) {
MDC.setContextMap(context);
logger.trace("Deleting file: {}", localFile);
try {
diff --git a/datafile-app-server/src/main/resources/datafile_endpoints.json b/datafile-app-server/src/main/resources/datafile_endpoints.json
deleted file mode 100644
index 8d45bc84..00000000
--- a/datafile-app-server/src/main/resources/datafile_endpoints.json
+++ /dev/null
@@ -1,37 +0,0 @@
-{
- "configs": {
- "dmaap": {
- "dmaapConsumerConfiguration": {
- "dmaapHostName": "localhost",
- "dmaapPortNumber": 2222,
- "dmaapTopicName": "/events/unauthenticated.VES_NOTIFICATION_OUTPUT",
- "dmaapProtocol": "http",
- "dmaapUserName": "",
- "dmaapUserPassword": "",
- "dmaapContentType": "application/json",
- "consumerId": "C12",
- "consumerGroup": "OpenDcae-c12",
- "timeoutMs": -1,
- "messageLimit": 1
- },
- "dmaapProducerConfiguration": {
- "dmaapHostName": "localhost",
- "dmaapPortNumber": 3907,
- "dmaapTopicName": "publish",
- "dmaapProtocol": "https",
- "dmaapUserName": "dradmin",
- "dmaapUserPassword": "dradmin",
- "dmaapContentType": "application/octet-stream"
- }
- },
- "ftp": {
- "ftpesConfiguration": {
- "keyCert": "config/dfc.jks",
- "keyPassword": "secret",
- "trustedCa": "config/ftp.jks",
- "trustedCaPassword": "secret"
- }
- }
- }
-}
-
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
index 5be75ab3..b1148a6a 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
@@ -16,25 +16,51 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Resources;
import com.google.gson.JsonElement;
+import com.google.gson.JsonIOException;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
+import com.google.gson.JsonSyntaxException;
+
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
import java.nio.charset.StandardCharsets;
-import java.util.Objects;
+import java.util.Map;
+import java.util.Properties;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
+import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.CloudConfigurationProvider;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.read.ListAppender;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
/**
* Tests the AppConfig.
@@ -44,167 +70,285 @@ import org.junit.jupiter.api.Test;
*/
class AppConfigTest {
- private static final String DATAFILE_ENDPOINTS = "datafile_endpoints.json";
- private static final boolean CORRECT_JSON = true;
- private static final boolean INCORRECT_JSON = false;
+ private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
+
+
+ private static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = //
+ new ImmutableDmaapConsumerConfiguration.Builder() //
+ .timeoutMs(-1) //
+ .dmaapHostName("message-router.onap.svc.cluster.local") //
+ .dmaapUserName("admin") //
+ .dmaapUserPassword("admin") //
+ .dmaapTopicName("events/unauthenticated.VES_NOTIFICATION_OUTPUT") //
+ .dmaapPortNumber(2222) //
+ .dmaapContentType("application/json") //
+ .messageLimit(-1) //
+ .dmaapProtocol("http") //
+ .consumerId("C12") //
+ .consumerGroup("OpenDcae-c12") //
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
+ .build();
+
+ private static final ConsumerConfiguration CORRECT_CONSUMER_CONFIG = ImmutableConsumerConfiguration.builder() //
+ .topicUrl(
+ "http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12")
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
+ .build();
+
+ private static final PublisherConfiguration CORRECT_PUBLISHER_CONFIG = //
+ ImmutablePublisherConfiguration.builder() //
+ .publishUrl("https://message-router.onap.svc.cluster.local:3907/publish/1") //
+ .logUrl("https://dmaap.example.com/feedlog/972").trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
+ .changeIdentifier("PM_MEAS_FILES") //
+ .userName("user") //
+ .passWord("password") //
+ .build();
+
+ private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION = //
+ new ImmutableFtpesConfig.Builder() //
+ .keyCert("/config/dfc.jks") //
+ .keyPassword("secret") //
+ .trustedCa("config/ftp.jks") //
+ .trustedCaPassword("secret") //
+ .build();
+
+ private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = //
+ new ImmutableDmaapPublisherConfiguration.Builder() //
+ .dmaapTopicName("/publish/1") //
+ .dmaapUserPassword("password") //
+ .dmaapPortNumber(3907) //
+ .dmaapProtocol("https") //
+ .dmaapContentType("application/octet-stream") //
+ .dmaapHostName("message-router.onap.svc.cluster.local") //
+ .dmaapUserName("user") //
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
+ .build();
+
+ private static EnvProperties properties() {
+ return ImmutableEnvProperties.builder() //
+ .consulHost("host") //
+ .consulPort(123) //
+ .cbsName("cbsName") //
+ .appName("appName") //
+ .build();
+ }
- private static AppConfig appConfigUnderTest;
+ private AppConfig appConfigUnderTest;
+ private CloudConfigurationProvider cloudConfigurationProvider = mock(CloudConfigurationProvider.class);
+ private final Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
- private static String filePath =
- Objects.requireNonNull(AppConfigTest.class.getClassLoader().getResource(DATAFILE_ENDPOINTS)).getFile();
@BeforeEach
public void setUp() {
appConfigUnderTest = spy(AppConfig.class);
+ appConfigUnderTest.setCloudConfigurationProvider(cloudConfigurationProvider);
+ appConfigUnderTest.systemEnvironment = new Properties();
}
@Test
- public void whenApplicationWasStarted_FilePathIsSet() {
+ public void whenTheConfigurationFits() throws IOException, DatafileTaskException {
// When
- appConfigUnderTest.setFilepath(filePath);
+ doReturn(getCorrectJson()).when(appConfigUnderTest).createInputStream(any());
+ appConfigUnderTest.initialize();
// Then
- verify(appConfigUnderTest, times(1)).setFilepath(anyString());
- verify(appConfigUnderTest, times(0)).loadConfigurationFromFile();
- Assertions.assertEquals(filePath, appConfigUnderTest.getFilepath());
+ verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
+
+ ConsumerConfiguration consumerCfg = appConfigUnderTest.getDmaapConsumerConfiguration();
+ Assertions.assertNotNull(consumerCfg);
+ assertThat(consumerCfg.toDmaap()).isEqualToComparingFieldByField(CORRECT_DMAAP_CONSUMER_CONFIG);
+ assertThat(consumerCfg).isEqualToComparingFieldByField(CORRECT_CONSUMER_CONFIG);
+
+ PublisherConfiguration publisherCfg = appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER);
+ Assertions.assertNotNull(publisherCfg);
+ assertThat(publisherCfg).isEqualToComparingFieldByField(CORRECT_PUBLISHER_CONFIG);
+ assertThat(publisherCfg.toDmaap()).isEqualToComparingFieldByField(CORRECT_DMAAP_PUBLISHER_CONFIG);
+
+ FtpesConfig ftpesConfig = appConfigUnderTest.getFtpesConfiguration();
+ assertThat(ftpesConfig).isNotNull();
+ assertThat(ftpesConfig).isEqualToComparingFieldByField(CORRECT_FTPES_CONFIGURATION);
}
@Test
- public void whenTheConfigurationFits_GetFtpsAndDmaapObjectRepresentationConfiguration() throws IOException {
- // Given
- InputStream inputStream =
- new ByteArrayInputStream((getJsonConfig(CORRECT_JSON).getBytes(StandardCharsets.UTF_8)));
-
+ public void whenTheConfigurationFits_twoProducers() throws IOException, DatafileTaskException {
// When
- appConfigUnderTest.setFilepath(filePath);
- doReturn(inputStream).when(appConfigUnderTest).createInputStream(any());
+ doReturn(getCorrectJsonTwoProducers()).when(appConfigUnderTest).createInputStream(any());
appConfigUnderTest.loadConfigurationFromFile();
// Then
- verify(appConfigUnderTest, times(1)).setFilepath(anyString());
verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration());
- Assertions.assertNotNull(appConfigUnderTest.getDmaapPublisherConfiguration());
- Assertions.assertEquals(appConfigUnderTest.getDmaapPublisherConfiguration(),
- appConfigUnderTest.getDmaapPublisherConfiguration());
- Assertions.assertEquals(appConfigUnderTest.getDmaapConsumerConfiguration(),
- appConfigUnderTest.getDmaapConsumerConfiguration());
- Assertions.assertEquals(appConfigUnderTest.getFtpesConfiguration(), appConfigUnderTest.getFtpesConfiguration());
+ Assertions.assertNotNull(appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER));
+ Assertions.assertNotNull(appConfigUnderTest.getPublisherConfiguration("XX_FILES"));
+ Assertions.assertNotNull(appConfigUnderTest.getPublisherConfiguration("YY_FILES"));
+
+ assertThat(appConfigUnderTest.getPublisherConfiguration("XX_FILES").publishUrl())
+ .isEqualTo("feed01::publish_url");
+ assertThat(appConfigUnderTest.getPublisherConfiguration("YY_FILES").publishUrl())
+ .isEqualTo("feed01::publish_url");
}
@Test
- public void whenFileIsNotExist_ThrowIoException() {
+ public void whenFileIsNotExist_ThrowException() throws DatafileTaskException {
// Given
- filePath = "/temp.json";
- appConfigUnderTest.setFilepath(filePath);
+ appConfigUnderTest.setFilepath("/temp.json");
// When
appConfigUnderTest.loadConfigurationFromFile();
// Then
- verify(appConfigUnderTest, times(1)).setFilepath(anyString());
- verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration());
- Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration());
- Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration());
+ assertThatThrownBy(() -> appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER))
+ .hasMessageContaining("No PublishingConfiguration loaded, changeIdentifier: PM_MEAS_FILES");
+ Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration());
}
@Test
- public void whenFileIsExistsButJsonIsIncorrect() throws IOException {
- // Given
- InputStream inputStream =
- new ByteArrayInputStream((getJsonConfig(INCORRECT_JSON).getBytes(StandardCharsets.UTF_8)));
+ public void whenFileIsExistsButJsonIsIncorrect() throws IOException, DatafileTaskException {
// When
- appConfigUnderTest.setFilepath(filePath);
- doReturn(inputStream).when(appConfigUnderTest).createInputStream(any());
+ doReturn(getIncorrectJson()).when(appConfigUnderTest).createInputStream(any());
appConfigUnderTest.loadConfigurationFromFile();
// Then
- verify(appConfigUnderTest, times(1)).setFilepath(anyString());
verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration());
- Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration());
+ assertThatThrownBy(() -> appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER))
+ .hasMessageContaining(CHANGE_IDENTIFIER);
Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration());
-
}
-
@Test
- public void whenTheConfigurationFits_ButRootElementIsNotAJsonObject() throws IOException {
- // Given
- InputStream inputStream =
- new ByteArrayInputStream((getJsonConfig(CORRECT_JSON).getBytes(StandardCharsets.UTF_8)));
+ public void whenTheConfigurationFits_ButRootElementIsNotAJsonObject() throws IOException, DatafileTaskException {
+
// When
- appConfigUnderTest.setFilepath(filePath);
- doReturn(inputStream).when(appConfigUnderTest).createInputStream(any());
+ doReturn(getCorrectJson()).when(appConfigUnderTest).createInputStream(any());
JsonElement jsonElement = mock(JsonElement.class);
when(jsonElement.isJsonObject()).thenReturn(false);
doReturn(jsonElement).when(appConfigUnderTest).getJsonElement(any(JsonParser.class), any(InputStream.class));
appConfigUnderTest.loadConfigurationFromFile();
// Then
- verify(appConfigUnderTest, times(1)).setFilepath(anyString());
verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration());
- Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration());
+ assertThatThrownBy(() -> appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER))
+ .hasMessageContaining(CHANGE_IDENTIFIER);
Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration());
}
- private String getJsonConfig(boolean correct) {
- JsonObject dmaapConsumerConfigData = new JsonObject();
- dmaapConsumerConfigData.addProperty("dmaapHostName", "localhost");
- dmaapConsumerConfigData.addProperty("dmaapPortNumber", 2222);
- dmaapConsumerConfigData.addProperty("dmaapTopicName", "/events/unauthenticated.VES_NOTIFICATION_OUTPUT");
- dmaapConsumerConfigData.addProperty("dmaapProtocol", "http");
- dmaapConsumerConfigData.addProperty("dmaapUserName", "admin");
- dmaapConsumerConfigData.addProperty("dmaapUserPassword", "admin");
- dmaapConsumerConfigData.addProperty("dmaapContentType", "application/json");
- dmaapConsumerConfigData.addProperty("consumerId", "C12");
- dmaapConsumerConfigData.addProperty("consumerGroup", "OpenDcae-c12");
- dmaapConsumerConfigData.addProperty("timeoutMs", -1);
- dmaapConsumerConfigData.addProperty("messageLimit", 1);
-
- JsonObject dmaapProducerConfigData = new JsonObject();
- dmaapProducerConfigData.addProperty("dmaapHostName", "localhost");
- dmaapProducerConfigData.addProperty("dmaapPortNumber", 3907);
- dmaapProducerConfigData.addProperty("dmaapTopicName", "publish");
- dmaapProducerConfigData.addProperty("dmaapProtocol", "https");
- if (correct) {
- dmaapProducerConfigData.addProperty("dmaapUserName", "dradmin");
- dmaapProducerConfigData.addProperty("dmaapUserPassword", "dradmin");
- dmaapProducerConfigData.addProperty("dmaapContentType", "application/octet-stream");
- }
-
- JsonObject dmaapConfigs = new JsonObject();
- dmaapConfigs.add("dmaapConsumerConfiguration", dmaapConsumerConfigData);
- dmaapConfigs.add("dmaapProducerConfiguration", dmaapProducerConfigData);
-
- JsonObject ftpesConfigData = new JsonObject();
- ftpesConfigData.addProperty("keyCert", "config/dfc.jks");
- ftpesConfigData.addProperty("keyPassword", "secret");
- ftpesConfigData.addProperty("trustedCa", "config/ftp.jks");
- ftpesConfigData.addProperty("trustedCaPassword", "secret");
-
- JsonObject security = new JsonObject();
- security.addProperty("trustStorePath", "trustStorePath");
- security.addProperty("trustStorePasswordPath", "trustStorePasswordPath");
- security.addProperty("keyStorePath", "keyStorePath");
- security.addProperty("keyStorePasswordPath", "keyStorePasswordPath");
- security.addProperty("enableDmaapCertAuth", "enableDmaapCertAuth");
-
- JsonObject ftpesConfiguration = new JsonObject();
- ftpesConfiguration.add("ftpesConfiguration", ftpesConfigData);
-
- JsonObject configs = new JsonObject();
- configs.add("dmaap", dmaapConfigs);
- configs.add("ftp", ftpesConfiguration);
- configs.add("security", security);
-
- JsonObject completeJson = new JsonObject();
- completeJson.add("configs", configs);
-
- return completeJson.toString();
+ @Test
+ public void whenPeriodicConfigRefreshNoEnvironmentVariables() {
+ ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(AppConfig.class);
+
+ Flux<AppConfig> task = appConfigUnderTest.createRefreshConfigurationTask(1L, context);
+
+ StepVerifier //
+ .create(task) //
+ .expectSubscription() //
+ .expectNextCount(0) //
+ .verifyComplete();
+
+ assertTrue(logAppender.list.toString().contains("$CONSUL_HOST environment has not been defined"));
+ }
+
+ @Test
+ public void whenPeriodicConfigRefreshNoConsul() {
+ ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(AppConfig.class);
+
+ doReturn(Mono.just(properties())).when(appConfigUnderTest).readEnvironmentVariables(any(), any());
+ Mono<JsonObject> err = Mono.error(new IOException());
+ doReturn(err).when(cloudConfigurationProvider).callForServiceConfigurationReactive(any());
+
+ Flux<AppConfig> task = appConfigUnderTest.createRefreshConfigurationTask(1L, context);
+
+ StepVerifier //
+ .create(task) //
+ .expectSubscription() //
+ .expectNextCount(0) //
+ .verifyComplete();
+
+ assertTrue(logAppender.list.toString()
+ .contains("Could not refresh application configuration java.io.IOException"));
+ }
+
+ @Test
+ public void whenPeriodicConfigRefreshSuccess() throws JsonIOException, JsonSyntaxException, IOException {
+ doReturn(Mono.just(properties())).when(appConfigUnderTest).readEnvironmentVariables(any(), any());
+
+ Mono<JsonObject> json = Mono.just(getJsonRootObject());
+
+ doReturn(json, json).when(cloudConfigurationProvider).callForServiceConfigurationReactive(any());
+
+ Flux<AppConfig> task = appConfigUnderTest.createRefreshConfigurationTask(1L, context);
+
+ StepVerifier //
+ .create(task) //
+ .expectSubscription() //
+ .expectNext(appConfigUnderTest) //
+ .verifyComplete();
+
+ Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration());
+ }
+
+ @Test
+ public void whenPeriodicConfigRefreshSuccess2() throws JsonIOException, JsonSyntaxException, IOException {
+ doReturn(Mono.just(properties())).when(appConfigUnderTest).readEnvironmentVariables(any(), any());
+
+ Mono<JsonObject> json = Mono.just(getJsonRootObject());
+ Mono<JsonObject> err = Mono.error(new IOException()); // no config entry created by the
+ // dmaap plugin
+
+ doReturn(json, err).when(cloudConfigurationProvider).callForServiceConfigurationReactive(any());
+
+ Flux<AppConfig> task = appConfigUnderTest.createRefreshConfigurationTask(1L, context);
+
+ StepVerifier //
+ .create(task) //
+ .expectSubscription() //
+ .expectNext(appConfigUnderTest) //
+ .verifyComplete();
+
+ Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration());
+ }
+
+ private JsonObject getJsonRootObject() throws JsonIOException, JsonSyntaxException, IOException {
+ JsonObject rootObject = (new JsonParser()).parse(new InputStreamReader(getCorrectJson())).getAsJsonObject();
+ return rootObject;
+ }
+
+ private static InputStream getCorrectJson() throws IOException {
+ URL url = CloudConfigParser.class.getClassLoader().getResource("datafile_endpoints_test.json");
+ String string = Resources.toString(url, Charsets.UTF_8);
+ return new ByteArrayInputStream((string.getBytes(StandardCharsets.UTF_8)));
+ }
+
+ private static InputStream getCorrectJsonTwoProducers() throws IOException {
+ URL url = CloudConfigParser.class.getClassLoader().getResource("datafile_endpoints_test_2producers.json");
+ String string = Resources.toString(url, Charsets.UTF_8);
+ return new ByteArrayInputStream((string.getBytes(StandardCharsets.UTF_8)));
+ }
+
+ private static InputStream getIncorrectJson() {
+ String string = "{" + //
+ " \"configs\": {" + //
+ " \"dmaap\": {"; //
+ return new ByteArrayInputStream((string.getBytes(StandardCharsets.UTF_8)));
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
deleted file mode 100644
index 07233d95..00000000
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*-
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.configuration;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import com.google.gson.JsonObject;
-import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
-
-
-class CloudConfigParserTest {
- private static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = //
- new ImmutableDmaapConsumerConfiguration.Builder() //
- .timeoutMs(-1) //
- .dmaapHostName("message-router.onap.svc.cluster.local") //
- .dmaapUserName("admin") //
- .dmaapUserPassword("admin") //
- .dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT") //
- .dmaapPortNumber(2222) //
- .dmaapContentType("application/json") //
- .messageLimit(-1) //
- .dmaapProtocol("http") //
- .consumerId("C12") //
- .consumerGroup("OpenDCAE-c12") //
- .trustStorePath("trustStorePath") //
- .trustStorePasswordPath("trustStorePasswordPath") //
- .keyStorePath("keyStorePath") //
- .keyStorePasswordPath("keyStorePasswordPath") //
- .enableDmaapCertAuth(true) //
- .build();
-
- private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = //
- new ImmutableDmaapPublisherConfiguration.Builder() //
- .dmaapTopicName("publish") //
- .dmaapUserPassword("dradmin") //
- .dmaapPortNumber(3907) //
- .dmaapProtocol("https") //
- .dmaapContentType("application/json") //
- .dmaapHostName("message-router.onap.svc.cluster.local") //
- .dmaapUserName("dradmin") //
- .trustStorePath("trustStorePath") //
- .trustStorePasswordPath("trustStorePasswordPath") //
- .keyStorePath("keyStorePath") //
- .keyStorePasswordPath("keyStorePasswordPath") //
- .enableDmaapCertAuth(true) //
- .build();
-
- private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION = //
- new ImmutableFtpesConfig.Builder() //
- .keyCert("/config/dfc.jks") //
- .keyPassword("secret") //
- .trustedCa("config/ftp.jks") //
- .trustedCaPassword("secret") //
- .build();
-
- private CloudConfigParser cloudConfigParser = new CloudConfigParser(getCloudConfigJsonObject());
-
- @Test
- public void shouldCreateDmaapConsumerConfigurationCorrectly() {
- DmaapConsumerConfiguration dmaapConsumerConfig = cloudConfigParser.getDmaapConsumerConfig();
-
- assertThat(dmaapConsumerConfig).isNotNull();
- assertThat(dmaapConsumerConfig).isEqualToComparingFieldByField(CORRECT_DMAAP_CONSUMER_CONFIG);
- }
-
- @Test
- public void shouldCreateDmaapPublisherConfigurationCorrectly() {
- DmaapPublisherConfiguration dmaapPublisherConfig = cloudConfigParser.getDmaapPublisherConfig();
-
- assertThat(dmaapPublisherConfig).isNotNull();
- assertThat(dmaapPublisherConfig).isEqualToComparingFieldByField(CORRECT_DMAAP_PUBLISHER_CONFIG);
- }
-
- @Test
- public void shouldCreateFtpesConfigurationCorrectly() {
- FtpesConfig ftpesConfig = cloudConfigParser.getFtpesConfig();
-
- assertThat(ftpesConfig).isNotNull();
- assertThat(ftpesConfig).isEqualToComparingFieldByField(CORRECT_FTPES_CONFIGURATION);
- }
-
- public JsonObject getCloudConfigJsonObject() {
- JsonObject config = new JsonObject();
- config.addProperty("dmaap.dmaapConsumerConfiguration.timeoutMs", -1);
- config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapHostName", "message-router.onap.svc.cluster.local");
- config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapUserName", "admin");
- config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapUserPassword", "admin");
- config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapTopicName",
- "/events/unauthenticated.VES_NOTIFICATION_OUTPUT");
- config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapPortNumber", 2222);
- config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapContentType", "application/json");
- config.addProperty("dmaap.dmaapConsumerConfiguration.messageLimit", -1);
- config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapProtocol", "http");
- config.addProperty("dmaap.dmaapConsumerConfiguration.consumerId", "C12");
- config.addProperty("dmaap.dmaapConsumerConfiguration.consumerGroup", "OpenDCAE-c12");
- config.addProperty("dmaap.dmaapProducerConfiguration.dmaapTopicName", "publish");
- config.addProperty("dmaap.dmaapProducerConfiguration.dmaapProtocol", "https");
- config.addProperty("dmaap.dmaapProducerConfiguration.dmaapContentType", "application/json");
- config.addProperty("dmaap.dmaapProducerConfiguration.dmaapHostName", "message-router.onap.svc.cluster.local");
- config.addProperty("dmaap.dmaapProducerConfiguration.dmaapPortNumber", 3907);
- config.addProperty("dmaap.dmaapProducerConfiguration.dmaapUserName", "dradmin");
- config.addProperty("dmaap.dmaapProducerConfiguration.dmaapUserPassword", "dradmin");
- config.addProperty("dmaap.ftpesConfig.keyCert", "/config/dfc.jks");
- config.addProperty("dmaap.ftpesConfig.keyPassword", "secret");
- config.addProperty("dmaap.ftpesConfig.trustedCa", "config/ftp.jks");
- config.addProperty("dmaap.ftpesConfig.trustedCaPassword", "secret");
-
- config.addProperty("dmaap.security.trustStorePath", "trustStorePath");
- config.addProperty("dmaap.security.trustStorePasswordPath", "trustStorePasswordPath");
- config.addProperty("dmaap.security.keyStorePath", "keyStorePath");
- config.addProperty("dmaap.security.keyStorePasswordPath", "keyStorePasswordPath");
- config.addProperty("dmaap.security.enableDmaapCertAuth", "true");
-
- return config;
- }
-}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfigTest.java
index 6e2140b4..eba88c33 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfigTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfigTest.java
@@ -25,6 +25,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
@@ -35,26 +36,40 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
+
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
+
import reactor.test.StepVerifier;
public class SchedulerConfigTest {
+ private final AppConfig appConfigurationMock = mock(AppConfig.class);
+ private final TaskScheduler taskSchedulerMock = mock(TaskScheduler.class);
+ private final ScheduledTasks scheduledTasksMock = mock(ScheduledTasks.class);
+ private final SchedulerConfig schedulerUnderTest =
+ spy(new SchedulerConfig(taskSchedulerMock, scheduledTasksMock, appConfigurationMock));
+
+ @BeforeEach
+ public void setUp() {
+ doNothing().when(appConfigurationMock).stop();
+ doNothing().when(appConfigurationMock).initialize();
+ }
+
@Test
public void getResponseFromCancellationOfTasks_success() {
+
List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
ScheduledFuture<?> scheduledFutureMock = mock(ScheduledFuture.class);
scheduledFutureList.add(scheduledFutureMock);
SchedulerConfig.setScheduledFutureList(scheduledFutureList);
- SchedulerConfig schedulerUnderTest = new SchedulerConfig(null, null, null);
-
String msg = "Datafile Service has already been stopped!";
StepVerifier.create(schedulerUnderTest.getResponseFromCancellationOfTasks())
.expectNext(new ResponseEntity<String>(msg, HttpStatus.CREATED)) //
@@ -68,24 +83,17 @@ public class SchedulerConfigTest {
@Test
public void tryToStartTaskWhenNotStarted_success() {
- TaskScheduler taskSchedulerMock = mock(TaskScheduler.class);
- ScheduledTasks scheduledTasksMock = mock(ScheduledTasks.class);
- CloudConfiguration cloudConfigurationMock = mock(CloudConfiguration.class);
List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
SchedulerConfig.setScheduledFutureList(scheduledFutureList);
SchedulerConfig schedulerUnderTestSpy =
- spy(new SchedulerConfig(taskSchedulerMock, scheduledTasksMock, cloudConfigurationMock));
+ spy(new SchedulerConfig(taskSchedulerMock, scheduledTasksMock, appConfigurationMock));
boolean actualResult = schedulerUnderTestSpy.tryToStartTask();
assertTrue(actualResult);
- ArgumentCaptor<Runnable> runTaskRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
- verify(taskSchedulerMock).scheduleAtFixedRate(runTaskRunnableCaptor.capture(), any(Instant.class),
- eq(Duration.ofMinutes(5)));
-
ArgumentCaptor<Runnable> scheduleMainDatafileEventTaskCaptor = ArgumentCaptor.forClass(Runnable.class);
verify(taskSchedulerMock).scheduleWithFixedDelay(scheduleMainDatafileEventTaskCaptor.capture(),
eq(Duration.ofSeconds(15)));
@@ -100,22 +108,22 @@ public class SchedulerConfigTest {
verify(scheduledTasksMock).executeDatafileMainTask();
verifyNoMoreInteractions(scheduledTasksMock);
- runTaskRunnableCaptor.getValue().run();
- verify(cloudConfigurationMock).runTask();
- verifyNoMoreInteractions(cloudConfigurationMock);
+ verify(appConfigurationMock).initialize();
+ verifyNoMoreInteractions(appConfigurationMock);
- assertEquals(3, scheduledFutureList.size());
+ assertEquals(2, scheduledFutureList.size());
}
@Test
public void tryToStartTaskWhenAlreadyStarted_shouldReturnFalse() {
+ doNothing().when(appConfigurationMock).loadConfigurationFromFile();
List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
ScheduledFuture<?> scheduledFutureMock = mock(ScheduledFuture.class);
scheduledFutureList.add(scheduledFutureMock);
SchedulerConfig.setScheduledFutureList(scheduledFutureList);
- SchedulerConfig schedulerUnderTest = new SchedulerConfig(null, null, null);
+ SchedulerConfig schedulerUnderTest = new SchedulerConfig(null, null, appConfigurationMock);
boolean actualResult = schedulerUnderTest.tryToStartTask();
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java
deleted file mode 100644
index 7f6b8c51..00000000
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*-
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.integration;
-
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.verify;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.test.context.ContextConfiguration;
-import org.springframework.test.context.junit.jupiter.SpringExtension;
-import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
-
-/**
- * Integration test for the ScheduledXmlContext.
- *
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/27/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-
-@Configuration
-@ComponentScan
-@ExtendWith({ SpringExtension.class })
-@ContextConfiguration(locations = { "classpath:scheduled-context.xml" })
-class ScheduledXmlContextITest extends AbstractTestNGSpringContextTests {
-
- private static final int WAIT_FOR_SCHEDULING = 1;
-
- @Autowired
- private ScheduledTasks scheduledTask;
-
- @Test
- void testScheduling() {
- final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
- executorService.scheduleWithFixedDelay(this::verifyDmaapConsumerTask, 0, WAIT_FOR_SCHEDULING, TimeUnit.SECONDS);
- }
-
- private void verifyDmaapConsumerTask() {
- verify(scheduledTask, atLeast(1)).executeDatafileMainTask();
- }
-}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java
deleted file mode 100644
index 83c92ef4..00000000
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*-
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.model;
-
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.HashMap;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-public class FilePublishInformationTest {
- private static final String PRODUCT_NAME = "NrRadio";
- private static final String VENDOR_NAME = "Ericsson";
- private static final String LAST_EPOCH_MICROSEC = "8745745764578";
- private static final String SOURCE_NAME = "oteNB5309";
- private static final String START_EPOCH_MICROSEC = "8745745764578";
- private static final String TIME_ZONE_OFFSET = "UTC+05:00";
- private static final String NAME = "A20161224.1030-1045.bin.gz";
- private static final String LOCATION = "ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1145.bin.gz";
- private static final Path INTERNAL_LOCATION = Paths.get("target/A20161224.1030-1045.bin.gz");
- private static final String COMPRESSION = "gzip";
- private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
- private static final String FILE_FORMAT_VERSION = "V10";
-
- @Test
- public void filePublishInformationBuilder_shouldBuildAnObject() {
- FilePublishInformation filePublishInformation = ImmutableFilePublishInformation.builder() //
- .productName(PRODUCT_NAME) //
- .vendorName(VENDOR_NAME) //
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
- .sourceName(SOURCE_NAME) //
- .startEpochMicrosec(START_EPOCH_MICROSEC) //
- .timeZoneOffset(TIME_ZONE_OFFSET) //
- .name(NAME) //
- .location(LOCATION) //
- .internalLocation(INTERNAL_LOCATION) //
- .compression(COMPRESSION) //
- .fileFormatType(FILE_FORMAT_TYPE) //
- .fileFormatVersion(FILE_FORMAT_VERSION) //
- .context(new HashMap<String,String>()) //
- .build();
-
- Assertions.assertNotNull(filePublishInformation);
- Assertions.assertEquals(PRODUCT_NAME, filePublishInformation.getProductName());
- Assertions.assertEquals(VENDOR_NAME, filePublishInformation.getVendorName());
- Assertions.assertEquals(LAST_EPOCH_MICROSEC, filePublishInformation.getLastEpochMicrosec());
- Assertions.assertEquals(SOURCE_NAME, filePublishInformation.getSourceName());
- Assertions.assertEquals(START_EPOCH_MICROSEC, filePublishInformation.getStartEpochMicrosec());
- Assertions.assertEquals(TIME_ZONE_OFFSET, filePublishInformation.getTimeZoneOffset());
- Assertions.assertEquals(NAME, filePublishInformation.getName());
- Assertions.assertEquals(LOCATION, filePublishInformation.getLocation());
- Assertions.assertEquals(INTERNAL_LOCATION, filePublishInformation.getInternalLocation());
- Assertions.assertEquals(COMPRESSION, filePublishInformation.getCompression());
- Assertions.assertEquals(FILE_FORMAT_TYPE, filePublishInformation.getFileFormatType());
- Assertions.assertEquals(FILE_FORMAT_VERSION, filePublishInformation.getFileFormatVersion());
- }
-}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
index becfba31..8c7938bf 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
@@ -22,9 +22,12 @@ import static org.mockito.Mockito.spy;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
+
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
@@ -36,6 +39,7 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
+
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@@ -65,7 +69,7 @@ class JsonMessageParserTest {
private static final String NOTIFICATION_FIELDS_VERSION = "1.0";
@Test
- void whenPassingCorrectJson_oneFileReadyMessage() {
+ void whenPassingCorrectJson_oneFileReadyMessage() throws URISyntaxException {
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
.name(PM_FILE_NAME) //
.location(LOCATION) //
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java
index e21bbd7b..a71521cd 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java
@@ -26,7 +26,7 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import java.net.URI;
+
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
@@ -35,7 +35,9 @@ import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
+
import javax.net.ssl.SSLContext;
+
import org.apache.commons.codec.binary.Base64;
import org.apache.http.Header;
import org.apache.http.HttpResponse;
@@ -192,10 +194,4 @@ class DmaapProducerHttpClientTest {
Header[] authorizationHeaders = request.getHeaders("Authorization");
assertEquals(base64Creds, authorizationHeaders[0].getValue());
}
-
- @Test
- public void getBaseUri_success() {
- URI uri = producerClientUnderTestSpy.getBaseUri().build();
- assertEquals(HTTPS_SCHEME + "://" + HOST + ":" + PORT, uri.toString());
- }
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
index 5e737253..574ad18e 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
@@ -77,6 +77,7 @@ public class DMaaPMessageConsumerTest {
private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
private static final String FILE_FORMAT_VERSION = "V10";
private static List<FilePublishInformation> listOfFilePublishInformation = new ArrayList<FilePublishInformation>();
+ private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
private DMaaPConsumerReactiveHttpClient httpClientMock;
@@ -173,6 +174,7 @@ public class DMaaPMessageConsumerTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
.context(new HashMap<String,String>()) //
.build();
listOfFilePublishInformation.add(filePublishInformation);
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
index 8f768d38..463c62c9 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
@@ -47,14 +47,12 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.springframework.http.HttpStatus;
-import org.springframework.web.util.DefaultUriBuilderFactory;
-import org.springframework.web.util.UriBuilder;
import reactor.test.StepVerifier;
@@ -65,6 +63,7 @@ import reactor.test.StepVerifier;
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
class DataRouterPublisherTest {
+
private static final String PRODUCT_NAME = "NrRadio";
private static final String VENDOR_NAME = "Ericsson";
private static final String LAST_EPOCH_MICROSEC = "8745745764578";
@@ -73,6 +72,7 @@ class DataRouterPublisherTest {
private static final String TIME_ZONE_OFFSET = "UTC+05:00";
private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
private static final String FTPES_ADDRESS = "ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME;
+ private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
private static final String COMPRESSION = "gzip";
private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
@@ -90,15 +90,17 @@ class DataRouterPublisherTest {
private static FilePublishInformation filePublishInformation;
private static DmaapProducerHttpClient httpClientMock;
private static AppConfig appConfig;
- private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
+ private static PublisherConfiguration publisherConfigurationMock = mock(PublisherConfiguration.class);
private static Map<String, String> context = new HashMap<>();
private static DataRouterPublisher publisherTaskUnderTestSpy;
+ // "https://54.45.333.2:1234/publish/1";
+ private static final String PUBLISH_URL =
+ HTTPS_SCHEME + "://" + HOST + ":" + PORT + "/" + PUBLISH_TOPIC + "/" + FEED_ID;
+
@BeforeAll
public static void setUp() {
- when(publisherConfigurationMock.dmaapHostName()).thenReturn(HOST);
- when(publisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME);
- when(publisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT);
+ when(publisherConfigurationMock.publishUrl()).thenReturn(PUBLISH_URL);
filePublishInformation = ImmutableFilePublishInformation.builder() //
.productName(PRODUCT_NAME) //
@@ -114,6 +116,7 @@ class DataRouterPublisherTest {
.fileFormatType(FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
.context(context) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
.build(); //
appConfig = mock(AppConfig.class);
publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig));
@@ -128,7 +131,6 @@ class DataRouterPublisherTest {
.verifyComplete();
ArgumentCaptor<HttpUriRequest> requestCaptor = ArgumentCaptor.forClass(HttpUriRequest.class);
- verify(httpClientMock).getBaseUri();
verify(httpClientMock).addUserCredentialsToHead(any(HttpUriRequest.class));
verify(httpClientMock).getDmaapProducerResponseWithRedirect(requestCaptor.capture(), any());
verifyNoMoreInteractions(httpClientMock);
@@ -138,6 +140,7 @@ class DataRouterPublisherTest {
assertEquals(HTTPS_SCHEME, actualUri.getScheme());
assertEquals(HOST, actualUri.getHost());
assertEquals(PORT, actualUri.getPort());
+
Path actualPath = Paths.get(actualUri.getPath());
assertTrue(PUBLISH_TOPIC.equals(actualPath.getName(0).toString()));
assertTrue(FEED_ID.equals(actualPath.getName(1).toString()));
@@ -160,7 +163,8 @@ class DataRouterPublisherTest {
assertEquals(FILE_FORMAT_TYPE, metaHash.get("fileFormatType"));
assertEquals(FILE_FORMAT_VERSION, metaHash.get("fileFormatVersion"));
- // Note that the following line checks the number of properties that are sent to the data router.
+ // Note that the following line checks the number of properties that are sent to the data
+ // router.
// This should be 10 unless the API is updated (which is the fields checked above)
assertEquals(10, metaHash.size());
}
@@ -185,7 +189,6 @@ class DataRouterPublisherTest {
.expectNext(filePublishInformation) //
.verifyComplete();
- verify(httpClientMock, times(2)).getBaseUri();
verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class));
verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any());
verifyNoMoreInteractions(httpClientMock);
@@ -201,7 +204,6 @@ class DataRouterPublisherTest {
.expectErrorMessage("Retries exhausted: 1/1") //
.verify();
- verify(httpClientMock, times(2)).getBaseUri();
verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class));
verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any());
verifyNoMoreInteractions(httpClientMock);
@@ -211,12 +213,9 @@ class DataRouterPublisherTest {
final void prepareMocksForTests(Exception exception, Integer firstResponse, Integer... nextHttpResponses)
throws Exception {
httpClientMock = mock(DmaapProducerHttpClient.class);
- when(appConfig.getDmaapPublisherConfiguration()).thenReturn(publisherConfigurationMock);
- doReturn(publisherConfigurationMock).when(publisherTaskUnderTestSpy).resolveConfiguration();
- doReturn(httpClientMock).when(publisherTaskUnderTestSpy).resolveClient();
-
- UriBuilder uriBuilder = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT);
- when(httpClientMock.getBaseUri()).thenReturn(uriBuilder);
+ when(appConfig.getPublisherConfiguration(CHANGE_IDENTIFIER)).thenReturn(publisherConfigurationMock);
+ doReturn(publisherConfigurationMock).when(publisherTaskUnderTestSpy).resolveConfiguration(CHANGE_IDENTIFIER);
+ doReturn(httpClientMock).when(publisherTaskUnderTestSpy).resolveClient(CHANGE_IDENTIFIER);
HttpResponse httpResponseMock = mock(HttpResponse.class);
if (exception == null) {
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
index cad3486d..299a0238 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
@@ -83,6 +83,7 @@ public class FileCollectorTest {
private static final String FTP_KEY_PASSWORD = "ftpKeyPassword";
private static final String TRUSTED_CA_PATH = "trustedCAPath";
private static final String TRUSTED_CA_PASSWORD = "trustedCAPassword";
+ private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
private static AppConfig appConfigMock = mock(AppConfig.class);
private static FtpesConfig ftpesConfigMock = mock(FtpesConfig.class);
@@ -132,7 +133,8 @@ public class FileCollectorTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
- .context(new HashMap<String,String>())
+ .context(new HashMap<String,String>()) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
.build();
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java
index 83643637..44755814 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java
@@ -34,10 +34,9 @@ import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.net.URI;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
+
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.StatusLine;
@@ -47,55 +46,47 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.springframework.web.util.DefaultUriBuilderFactory;
-import org.springframework.web.util.UriBuilder;
public class PublishedCheckerTest {
+ private static final String PUBLISH_URL = "https://54.45.33.2:1234/";
private static final String EMPTY_CONTENT = "[]";
- private static final String FEEDLOG_TOPIC = "feedlog";
- private static final String FEED_ID = "1";
- private static final String HTTPS_SCHEME = "https";
- private static final String HOST = "54.45.33.2";
- private static final int PORT = 1234;
private static final String SOURCE_NAME = "oteNB5309";
private static final String FILE_NAME = "A20161224.1030-1045.bin.gz";
private static final String LOCAL_FILE_NAME = SOURCE_NAME + "_" + FILE_NAME;
+ private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
+ private static final String LOG_URI = "https://localhost:3907/feedlog/1";
private static final Map<String, String> CONTEXT_MAP = new HashMap<>();
- private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
+ private static PublisherConfiguration publisherConfigurationMock = mock(PublisherConfiguration.class);
private static AppConfig appConfigMock;
private DmaapProducerHttpClient httpClientMock = mock(DmaapProducerHttpClient.class);
private PublishedChecker publishedCheckerUnderTestSpy;
- /**
- * Sets up data for the tests.
- */
+
@BeforeAll
- public static void setUp() {
- when(publisherConfigurationMock.dmaapHostName()).thenReturn(HOST);
- when(publisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME);
- when(publisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT);
+ public static void setUp() throws DatafileTaskException {
+ when(publisherConfigurationMock.publishUrl()).thenReturn(PUBLISH_URL);
appConfigMock = mock(AppConfig.class);
- when(appConfigMock.getDmaapPublisherConfiguration()).thenReturn(publisherConfigurationMock);
+ when(appConfigMock.getPublisherConfiguration(CHANGE_IDENTIFIER)).thenReturn(publisherConfigurationMock);
}
@Test
public void executeWhenNotPublished_returnsFalse() throws Exception {
prepareMocksForTests(HttpUtils.SC_OK, EMPTY_CONTENT, null);
- boolean isPublished = publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CONTEXT_MAP);
+ boolean isPublished =
+ publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CHANGE_IDENTIFIER, CONTEXT_MAP);
assertFalse(isPublished);
ArgumentCaptor<HttpUriRequest> requestCaptor = ArgumentCaptor.forClass(HttpUriRequest.class);
- verify(httpClientMock).getBaseUri();
verify(httpClientMock).addUserCredentialsToHead(any(HttpUriRequest.class));
verify(httpClientMock).getDmaapProducerResponseWithCustomTimeout(requestCaptor.capture(), any(), any());
verifyNoMoreInteractions(httpClientMock);
@@ -103,22 +94,17 @@ public class PublishedCheckerTest {
HttpUriRequest getRequest = requestCaptor.getValue();
assertTrue(getRequest instanceof HttpGet);
URI actualUri = getRequest.getURI();
- assertEquals(HTTPS_SCHEME, actualUri.getScheme());
- assertEquals(HOST, actualUri.getHost());
- assertEquals(PORT, actualUri.getPort());
- Path actualPath = Paths.get(actualUri.getPath());
- assertTrue(FEEDLOG_TOPIC.equals(actualPath.getName(0).toString()));
- assertTrue(FEED_ID.equals(actualPath.getName(1).toString()));
- String actualQuery = actualUri.getQuery();
- assertTrue(actualQuery.contains("type=pub"));
- assertTrue(actualQuery.contains("filename=" + LOCAL_FILE_NAME));
+ // https://localhost:3907/feedlog/1?type=pub&filename=oteNB5309_A20161224.1030-1045.bin.gz
+ String expUri = LOG_URI + "?type=pub&filename=" + LOCAL_FILE_NAME;
+ assertEquals(expUri, actualUri.toString());
}
@Test
public void executeWhenDataRouterReturnsNok_returnsFalse() throws Exception {
prepareMocksForTests(HttpUtils.SC_BAD_REQUEST, EMPTY_CONTENT, null);
- boolean isPublished = publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CONTEXT_MAP);
+ boolean isPublished =
+ publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CHANGE_IDENTIFIER, CONTEXT_MAP);
assertFalse(isPublished);
}
@@ -127,7 +113,8 @@ public class PublishedCheckerTest {
public void executeWhenPublished_returnsTrue() throws Exception {
prepareMocksForTests(HttpUtils.SC_OK, "[" + LOCAL_FILE_NAME + "]", null);
- boolean isPublished = publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CONTEXT_MAP);
+ boolean isPublished =
+ publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CHANGE_IDENTIFIER, CONTEXT_MAP);
assertTrue(isPublished);
}
@@ -136,7 +123,8 @@ public class PublishedCheckerTest {
public void executeWhenErrorInDataRouter_returnsFalse() throws Exception {
prepareMocksForTests(HttpUtils.SC_OK, EMPTY_CONTENT, new DatafileTaskException(""));
- boolean isPublished = publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CONTEXT_MAP);
+ boolean isPublished =
+ publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CHANGE_IDENTIFIER, CONTEXT_MAP);
assertFalse(isPublished);
}
@@ -144,11 +132,9 @@ public class PublishedCheckerTest {
final void prepareMocksForTests(int responseCode, String content, Exception exception) throws Exception {
publishedCheckerUnderTestSpy = spy(new PublishedChecker(appConfigMock));
- doReturn(publisherConfigurationMock).when(publishedCheckerUnderTestSpy).resolveConfiguration();
- doReturn(httpClientMock).when(publishedCheckerUnderTestSpy).resolveClient();
-
- UriBuilder uriBuilder = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT);
- when(httpClientMock.getBaseUri()).thenReturn(uriBuilder);
+ doReturn(publisherConfigurationMock).when(publishedCheckerUnderTestSpy).resolveConfiguration(CHANGE_IDENTIFIER);
+ doReturn(LOG_URI).when(publisherConfigurationMock).logUrl();
+ doReturn(httpClientMock).when(publishedCheckerUnderTestSpy).resolveClient(publisherConfigurationMock);
HttpResponse httpResponseMock = mock(HttpResponse.class);
if (exception == null) {
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
index 0d5a4231..a1021868 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
@@ -42,6 +42,11 @@ import java.util.Map;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration;
+import org.onap.dcaegen2.collectors.datafile.configuration.ImmutableConsumerConfiguration;
+import org.onap.dcaegen2.collectors.datafile.configuration.ImmutablePublisherConfiguration;
+import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
@@ -51,8 +56,6 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformati
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -61,6 +64,7 @@ import reactor.test.StepVerifier;
public class ScheduledTasksTest {
private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
+ private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
private AppConfig appConfig = mock(AppConfig.class);
private ScheduledTasks testedObject = spy(new ScheduledTasks(appConfig));
@@ -72,23 +76,33 @@ public class ScheduledTasksTest {
private DataRouterPublisher dataRouterMock;
private Map<String, String> contextMap = new HashMap<String, String>();
+ private final String publishUrl = "https://54.45.33.2:1234/unauthenticated.VES_NOTIFICATION_OUTPUT";
+
@BeforeEach
- private void setUp() {
- DmaapPublisherConfiguration dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() //
- .dmaapContentType("application/json") //
- .dmaapHostName("54.45.33.2") //
- .dmaapPortNumber(1234) //
- .dmaapProtocol("https") //
- .dmaapUserName("DFC") //
- .dmaapUserPassword("DFC") //
- .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") //
+ private void setUp() throws DatafileTaskException {
+ final PublisherConfiguration dmaapPublisherConfiguration = ImmutablePublisherConfiguration.builder() //
+ .publishUrl(publishUrl) //
+ .logUrl("") //
+ .userName("userName") //
+ .passWord("passWord") //
.trustStorePath("trustStorePath") //
.trustStorePasswordPath("trustStorePasswordPath") //
.keyStorePath("keyStorePath") //
.keyStorePasswordPath("keyStorePasswordPath") //
.enableDmaapCertAuth(true) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
.build(); //
- doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
+ final ConsumerConfiguration dmaapConsumerConfiguration = ImmutableConsumerConfiguration.builder() //
+ .topicUrl("topicUrl").trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
+ .build();
+
+ doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER);
+ doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration();
+ doReturn(true).when(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
consumerMock = mock(DMaaPMessageConsumer.class);
publishedCheckerMock = mock(PublishedChecker.class);
@@ -109,7 +123,7 @@ public class ScheduledTasksTest {
.sourceName("") //
.startEpochMicrosec("") //
.timeZoneOffset("") //
- .changeIdentifier("") //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
.changeType("") //
.build();
}
@@ -164,11 +178,12 @@ public class ScheduledTasksTest {
.compression("") //
.fileFormatType("") //
.fileFormatVersion("") //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
.context(new HashMap<String, String>()).build();
}
@Test
- public void notingToConsume() {
+ public void notingToConsume() throws DatafileTaskException {
doReturn(consumerMock).when(testedObject).createConsumerTask();
doReturn(Flux.empty()).when(consumerMock).getMessageRouterResponse();
@@ -180,7 +195,7 @@ public class ScheduledTasksTest {
}
@Test
- public void consume_successfulCase() {
+ public void consume_successfulCase() throws DatafileTaskException {
final int noOfEvents = 200;
final int noOfFilesPerEvent = 200;
final int noOfFiles = noOfEvents * noOfFilesPerEvent;
@@ -188,7 +203,7 @@ public class ScheduledTasksTest {
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
- doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
+ doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
@@ -212,11 +227,11 @@ public class ScheduledTasksTest {
}
@Test
- public void consume_fetchFailedOnce() {
+ public void consume_fetchFailedOnce() throws DatafileTaskException {
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
- doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
+ doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
Mono<Object> error = Mono.error(new Exception("problem"));
@@ -246,12 +261,12 @@ public class ScheduledTasksTest {
}
@Test
- public void consume_publishFailedOnce() {
+ public void consume_publishFailedOnce() throws DatafileTaskException {
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
- doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
+ doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
@@ -279,7 +294,7 @@ public class ScheduledTasksTest {
}
@Test
- public void consume_successfulCase_sameFileNames() {
+ public void consume_successfulCase_sameFileNames() throws DatafileTaskException {
final int noOfEvents = 1;
final int noOfFilesPerEvent = 100;
@@ -287,7 +302,7 @@ public class ScheduledTasksTest {
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
- doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
+ doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
@@ -303,7 +318,7 @@ public class ScheduledTasksTest {
verify(consumerMock, times(1)).getMessageRouterResponse();
verify(fileCollectorMock, times(1)).collectFile(notNull(), anyLong(), notNull(), notNull());
verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull());
- verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), notNull());
+ verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), anyString(), notNull());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
verifyNoMoreInteractions(consumerMock);
diff --git a/datafile-app-server/src/test/resources/datafile_endpoints.json b/datafile-app-server/src/test/resources/datafile_endpoints.json
deleted file mode 100644
index 14dee368..00000000
--- a/datafile-app-server/src/test/resources/datafile_endpoints.json
+++ /dev/null
@@ -1,44 +0,0 @@
-{
- "configs": {
- "dmaap": {
- "dmaapConsumerConfiguration": {
- "consumerId": "C12",
- "dmaapHostName": "localhost",
- "dmaapPortNumber": 2222,
- "dmaapTopicName": "/events/unauthenticated.VES_NOTIFICATION_OUTPUT",
- "dmaapProtocol": "http",
- "dmaapUserName": "admin",
- "dmaapUserPassword": "admin",
- "dmaapContentType": "application/json",
- "consumerGroup": "OpenDcae-c12",
- "timeoutMs": -1,
- "messageLimit": 1
- },
- "dmaapProducerConfiguration": {
- "dmaapHostName": "localhost",
- "dmaapPortNumber": 3907,
- "dmaapProtocol": "https",
- "dmaapTopicName": "publish",
- "dmaapUserName": "dradmin",
- "dmaapUserPassword": "dradmin",
- "dmaapContentType": "application/octet-stream"
- }
- },
- "ftp": {
- "ftpesConfiguration": {
- "keyCert": "/config/dfc.jks",
- "keyPassword": "secret",
- "trustedCa": "/config/ftp.jks",
- "trustedCaPassword": "secret"
- }
- },
- "security": {
- "trustStorePath" : "trustStorePath",
- "trustStorePasswordPath" : "trustStorePasswordPath",
- "keyStorePath" : "keyStorePath",
- "keyStorePasswordPath" : "keyStorePasswordPath",
- "enableDmaapCertAuth" : "enableDmaapCertAuth"
- }
- }
-}
-
diff --git a/datafile-app-server/src/test/resources/datafile_endpoints_test.json b/datafile-app-server/src/test/resources/datafile_endpoints_test.json
new file mode 100644
index 00000000..4d4d00ab
--- /dev/null
+++ b/datafile-app-server/src/test/resources/datafile_endpoints_test.json
@@ -0,0 +1,31 @@
+{
+ "dmaap.ftpesConfig.keyCert":"/config/dfc.jks",
+ "dmaap.ftpesConfig.keyPassword":"secret",
+ "dmaap.ftpesConfig.trustedCa":"config/ftp.jks",
+ "dmaap.ftpesConfig.trustedCaPassword":"secret",
+ "dmaap.security.trustStorePath":"trustStorePath",
+ "dmaap.security.trustStorePasswordPath":"trustStorePasswordPath",
+ "dmaap.security.keyStorePath":"keyStorePath",
+ "dmaap.security.keyStorePasswordPath":"keyStorePasswordPath",
+ "dmaap.security.enableDmaapCertAuth":"true",
+ "dmaap.dmaapProducerConfiguration": {
+ "changeIdentifier":"PM_MEAS_FILES",
+ "feedName":"feed00"
+ },
+ "streams_subscribes":{
+ "dmaap_subscriber":{
+ "dmmap_info":{
+ "topic_url":"http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12"
+ },
+ "type":"message_router"
+ }
+ },
+ "feed00":{
+ "username":"user",
+ "log_url":"https://dmaap.example.com/feedlog/972",
+ "publish_url":"https://message-router.onap.svc.cluster.local:3907/publish/1",
+ "location":"loc00",
+ "password":"password",
+ "publisher_id":"972.360gm"
+ }
+}
diff --git a/datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json b/datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json
new file mode 100644
index 00000000..a7e2497d
--- /dev/null
+++ b/datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json
@@ -0,0 +1,49 @@
+{
+ "dmaap.ftpesConfig.keyCert":"/config/dfc.jks",
+ "dmaap.ftpesConfig.keyPassword":"secret",
+ "dmaap.ftpesConfig.trustedCa":"config/ftp.jks",
+ "dmaap.ftpesConfig.trustedCaPassword":"secret",
+ "dmaap.security.trustStorePath":"trustStorePath",
+ "dmaap.security.trustStorePasswordPath":"trustStorePasswordPath",
+ "dmaap.security.keyStorePath":"keyStorePath",
+ "dmaap.security.keyStorePasswordPath":"keyStorePasswordPath",
+ "dmaap.security.enableDmaapCertAuth":"true",
+ "dmaap.dmaapProducerConfiguration":[
+ {
+ "changeIdentifier":"PM_MEAS_FILES",
+ "feedName":"feed00"
+ },
+ {
+ "changeIdentifier":"XX_FILES",
+ "feedName":"feed01"
+ },
+ {
+ "changeIdentifier":"YY_FILES",
+ "feedName":"feed01"
+ }
+ ],
+ "streams_subscribes":{
+ "dmaap_subscriber":{
+ "dmmap_info":{
+ "topic_url":"http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12"
+ },
+ "type":"message_router"
+ }
+ },
+ "feed00":{
+ "username":"user",
+ "log_url":"https://dmaap.example.com/feedlog/972",
+ "publish_url":"https://message-router.onap.svc.cluster.local:3907/publish/1",
+ "location":"loc00",
+ "password":"password",
+ "publisher_id":"972.360gm"
+ },
+ "feed01":{
+ "username":"user",
+ "log_url":"feed01::log_url",
+ "publish_url":"feed01::publish_url",
+ "location":"loc00",
+ "password":"",
+ "publisher_id":""
+ }
+}