summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--datafile-app-server/config/datafile_endpoints.json71
-rw-r--r--datafile-app-server/pom.xml13
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java211
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java141
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java110
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java105
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java5
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java74
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java24
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java3
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java8
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java15
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java46
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java20
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java9
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java47
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java13
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java52
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java58
-rw-r--r--datafile-app-server/src/main/resources/datafile_endpoints.json37
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java356
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java133
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfigTest.java38
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java63
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java72
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java6
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java10
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java2
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java33
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java4
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java62
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java61
-rw-r--r--datafile-app-server/src/test/resources/datafile_endpoints.json44
-rw-r--r--datafile-app-server/src/test/resources/datafile_endpoints_test.json31
-rw-r--r--datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json49
43 files changed, 1097 insertions, 955 deletions
diff --git a/datafile-app-server/config/datafile_endpoints.json b/datafile-app-server/config/datafile_endpoints.json
index 833f1e91..cd1b502f 100644
--- a/datafile-app-server/config/datafile_endpoints.json
+++ b/datafile-app-server/config/datafile_endpoints.json
@@ -1,44 +1,33 @@
{
- "configs": {
- "dmaap": {
- "dmaapConsumerConfiguration": {
- "dmaapHostName": "localhost",
- "dmaapPortNumber": 2222,
- "dmaapTopicName": "/events/unauthenticated.VES_NOTIFICATION_OUTPUT",
- "dmaapProtocol": "http",
- "dmaapUserName": "",
- "dmaapUserPassword": "",
- "dmaapContentType": "application/json",
- "consumerId": "C12",
- "consumerGroup": "OpenDcae-c12",
- "timeoutMs": -1,
- "messageLimit": 1
- },
- "dmaapProducerConfiguration": {
- "dmaapHostName": "localhost",
- "dmaapPortNumber": 3907,
- "dmaapTopicName": "publish",
- "dmaapProtocol": "https",
- "dmaapUserName": "dradmin",
- "dmaapUserPassword": "dradmin",
- "dmaapContentType": "application/octet-stream"
- }
- },
- "ftp": {
- "ftpesConfiguration": {
- "keyCert": "config/dfc.jks",
- "keyPassword": "secret",
- "trustedCa": "config/ftp.jks",
- "trustedCaPassword": "secret"
- }
- },
- "security": {
- "trustStorePath" : "change it",
- "trustStorePasswordPath" : "change it",
- "keyStorePath" : "change it",
- "keyStorePasswordPath" : "change it",
- "enableDmaapCertAuth" : "false"
- }
- }
+ "//description":"This file is only used for testing purposes",
+ "dmaap.ftpesConfig.keyCert":"/config/dfc.jks",
+ "dmaap.ftpesConfig.keyPassword":"secret",
+ "dmaap.ftpesConfig.trustedCa":"config/ftp.jks",
+ "dmaap.ftpesConfig.trustedCaPassword":"secret",
+ "dmaap.security.trustStorePath":"change it",
+ "dmaap.security.trustStorePasswordPath":"trustStorePasswordPath",
+ "dmaap.security.keyStorePath":"keyStorePath",
+ "dmaap.security.keyStorePasswordPath":"change it",
+ "dmaap.security.enableDmaapCertAuth":"false",
+ "dmaap.dmaapProducerConfiguration" : {
+ "changeIdentifier":"PM_MEAS_FILES",
+ "feedName":"feed00"
+ },
+ "streams_subscribes":{
+ "dmaap_subscriber":{
+ "dmmap_info":{
+ "topic_url":"http://dradmin:dradmin@localhost:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12"
+ },
+ "type":"message_router"
+ }
+ },
+ "feed00":{
+ "username":"user",
+ "log_url":"https://localhost:3907/feedlog/1",
+ "publish_url":"https://localhost:3907/publish/1",
+ "location":"loc00",
+ "password":"dradmin",
+ "publisher_id":"972.360gm"
+ }
}
diff --git a/datafile-app-server/pom.xml b/datafile-app-server/pom.xml
index 42bdd771..e3c60092 100644
--- a/datafile-app-server/pom.xml
+++ b/datafile-app-server/pom.xml
@@ -65,6 +65,19 @@
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-actuator</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>javax.xml.bind</groupId>
+ <artifactId>jaxb-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-configuration-processor</artifactId>
+ <optional>true</optional>
+ </dependency>
<!--TESTS DEPENDENCIES -->
<dependency>
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index a38eab8f..d324ca99 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -22,22 +22,38 @@ import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import com.google.gson.TypeAdapterFactory;
+
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Properties;
import java.util.ServiceLoader;
+
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.CloudConfigurationProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.ComponentScan;
import org.springframework.stereotype.Component;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
/**
* Holds all configuration for the DFC.
*
@@ -46,105 +62,174 @@ import org.springframework.stereotype.Component;
*/
@Component
+@ComponentScan("org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers")
@EnableConfigurationProperties
@ConfigurationProperties("app")
public class AppConfig {
-
- private static final String CONFIG = "configs";
- private static final String DMAAP = "dmaap";
- private static final String DMAAP_PRODUCER = "dmaapProducerConfiguration";
- private static final String DMAAP_CONSUMER = "dmaapConsumerConfiguration";
- private static final String FTP = "ftp";
- private static final String FTPES_CONFIGURATION = "ftpesConfiguration";
- private static final String SECURITY = "security";
private static final Logger logger = LoggerFactory.getLogger(AppConfig.class);
- private DmaapConsumerConfiguration dmaapConsumerConfiguration;
- private DmaapPublisherConfiguration dmaapPublisherConfiguration;
+ private ConsumerConfiguration dmaapConsumerConfiguration;
+ private Map<String, PublisherConfiguration> publishingConfiguration;
private FtpesConfig ftpesConfiguration;
+ private CloudConfigurationProvider cloudConfigurationProvider;
+ @Value("#{systemEnvironment}")
+ Properties systemEnvironment;
+ private Disposable refreshConfigTask = null;
@NotEmpty
private String filepath;
- public synchronized DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
+ @Autowired
+ public synchronized void setCloudConfigurationProvider(
+ CloudConfigurationProvider reactiveCloudConfigurationProvider) {
+ this.cloudConfigurationProvider = reactiveCloudConfigurationProvider;
+ }
+
+ public synchronized void setFilepath(String filepath) {
+ this.filepath = filepath;
+ }
+
+ /**
+ * Reads the cloud configuration.
+ */
+ public void initialize() {
+ stop();
+ Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
+ loadConfigurationFromFile();
+
+ refreshConfigTask = Flux.interval(Duration.ZERO, Duration.ofMinutes(5))
+ .flatMap(count -> createRefreshConfigurationTask(count, context))
+ .subscribe(e -> logger.info("Refreshed configuration data"),
+ throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
+ () -> logger.error("Configuration refresh terminated"));
+ }
+
+ public void stop() {
+ if (refreshConfigTask != null) {
+ refreshConfigTask.dispose();
+ refreshConfigTask = null;
+ }
+ }
+
+ public synchronized ConsumerConfiguration getDmaapConsumerConfiguration() {
return dmaapConsumerConfiguration;
}
- public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
- return dmaapPublisherConfiguration;
+ public synchronized boolean isFeedConfigured(String changeIdentifier) {
+ return publishingConfiguration.containsKey(changeIdentifier);
+ }
+
+ public synchronized PublisherConfiguration getPublisherConfiguration(String changeIdentifier)
+ throws DatafileTaskException {
+
+ if (publishingConfiguration == null) {
+ throw new DatafileTaskException("No PublishingConfiguration loaded, changeIdentifier: " + changeIdentifier);
+ }
+ PublisherConfiguration cfg = publishingConfiguration.get(changeIdentifier);
+ if (cfg == null) {
+ throw new DatafileTaskException(
+ "Cannot find getPublishingConfiguration for changeIdentifier: " + changeIdentifier);
+ }
+ return cfg;
}
public synchronized FtpesConfig getFtpesConfiguration() {
return ftpesConfiguration;
}
+ Flux<AppConfig> createRefreshConfigurationTask(Long counter, Map<String, String> context) {
+ return Flux.just(counter) //
+ .doOnNext(cnt -> logger.debug("Refresh config {}", cnt)) //
+ .flatMap(cnt -> readEnvironmentVariables(systemEnvironment, context)) //
+ .flatMap(this::fetchConfiguration);
+ }
+
+ Mono<EnvProperties> readEnvironmentVariables(Properties systemEnvironment, Map<String, String> context) {
+ return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context)
+ .onErrorResume(AppConfig::onErrorResume);
+ }
+
+ private static <R> Mono<R> onErrorResume(Throwable trowable) {
+ logger.error("Could not refresh application configuration {}", trowable.toString());
+ return Mono.empty();
+ }
+
+ private Mono<AppConfig> fetchConfiguration(EnvProperties env) {
+ Mono<JsonObject> serviceCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(env) //
+ .onErrorResume(AppConfig::onErrorResume);
+
+ // Note, have to use this callForServiceConfigurationReactive with EnvProperties, since the
+ // other ones does not work
+ EnvProperties dmaapEnv = ImmutableEnvProperties.builder() //
+ .consulHost(env.consulHost()) //
+ .consulPort(env.consulPort()) //
+ .cbsName(env.cbsName()) //
+ .appName(env.appName() + ":dmaap") //
+ .build(); //
+ Mono<JsonObject> dmaapCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(dmaapEnv)
+ .onErrorResume(t -> Mono.just(new JsonObject()));
+
+ return serviceCfg.zipWith(dmaapCfg, this::parseCloudConfig) //
+ .onErrorResume(AppConfig::onErrorResume);
+ }
+
+ /**
+ * parse configuration
+ *
+ * @param serviceConfigRootObject
+ * @param dmaapConfigRootObject if there is no dmaapConfigRootObject, the dmaap feeds are taken
+ * from the serviceConfigRootObject
+ * @return this which is updated if successful
+ */
+ private AppConfig parseCloudConfig(JsonObject serviceConfigRootObject, JsonObject dmaapConfigRootObject) {
+ try {
+ CloudConfigParser parser = new CloudConfigParser(serviceConfigRootObject, dmaapConfigRootObject);
+ setConfiguration(parser.getDmaapConsumerConfig(), parser.getDmaapPublisherConfig(),
+ parser.getFtpesConfig());
+ } catch (DatafileTaskException e) {
+ logger.error("Could not parse configuration {}", e.toString(), e);
+ }
+ return this;
+ }
+
/**
* Reads the configuration from file.
*/
- public void loadConfigurationFromFile() {
+ void loadConfigurationFromFile() {
GsonBuilder gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
- JsonParser parser = new JsonParser();
- JsonObject jsonObject;
+
try (InputStream inputStream = createInputStream(filepath)) {
- JsonElement rootElement = getJsonElement(parser, inputStream);
- if (rootElement.isJsonObject()) {
- jsonObject = rootElement.getAsJsonObject();
- FtpesConfig ftpesConfig = deserializeType(gsonBuilder,
- jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(FTP).getAsJsonObject(FTPES_CONFIGURATION),
- FtpesConfig.class);
- DmaapConsumerConfiguration consumerConfiguration = deserializeType(gsonBuilder,
- concatenateJsonObjects(
- jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP)
- .getAsJsonObject(DMAAP_CONSUMER),
- rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
- DmaapConsumerConfiguration.class);
-
- DmaapPublisherConfiguration publisherConfiguration = deserializeType(gsonBuilder,
- concatenateJsonObjects(
- jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP)
- .getAsJsonObject(DMAAP_PRODUCER),
- rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
- DmaapPublisherConfiguration.class);
-
- setConfiguration(consumerConfiguration, publisherConfiguration, ftpesConfig);
+ JsonParser parser = new JsonParser();
+ JsonObject rootObject = getJsonElement(parser, inputStream).getAsJsonObject();
+ if (rootObject == null) {
+ throw new JsonSyntaxException("Root is not a json object");
}
+ parseCloudConfig(rootObject, rootObject);
} catch (JsonSyntaxException | IOException e) {
- logger.error("Problem with loading configuration, file: {}", filepath, e);
+ logger.warn("Local configuration file not loaded: {}", filepath, e);
}
}
- synchronized void setConfiguration(DmaapConsumerConfiguration consumerConfiguration,
- DmaapPublisherConfiguration publisherConfiguration, FtpesConfig ftpesConfig) {
- this.dmaapConsumerConfiguration = consumerConfiguration;
- this.dmaapPublisherConfiguration = publisherConfiguration;
- this.ftpesConfiguration = ftpesConfig;
+ private synchronized void setConfiguration(ConsumerConfiguration consumerConfiguration,
+ Map<String, PublisherConfiguration> publisherConfiguration, FtpesConfig ftpesConfig) {
+ if (consumerConfiguration == null || publisherConfiguration == null || ftpesConfig == null) {
+ logger.error(
+ "Problem with configuration consumerConfiguration: {}, publisherConfiguration: {}, ftpesConfig: {}",
+ consumerConfiguration, publisherConfiguration, ftpesConfig);
+ } else {
+ this.dmaapConsumerConfiguration = consumerConfiguration;
+ this.publishingConfiguration = publisherConfiguration;
+ this.ftpesConfiguration = ftpesConfig;
+ }
}
JsonElement getJsonElement(JsonParser parser, InputStream inputStream) {
return parser.parse(new InputStreamReader(inputStream));
}
- private <T> T deserializeType(@NotNull GsonBuilder gsonBuilder, @NotNull JsonObject jsonObject,
- @NotNull Class<T> type) {
- return gsonBuilder.create().fromJson(jsonObject, type);
- }
-
InputStream createInputStream(@NotNull String filepath) throws IOException {
return new BufferedInputStream(new FileInputStream(filepath));
}
- synchronized String getFilepath() {
- return this.filepath;
- }
-
- public synchronized void setFilepath(String filepath) {
- this.filepath = filepath;
- }
-
- private JsonObject concatenateJsonObjects(JsonObject target, JsonObject source) {
- source.entrySet().forEach(entry -> target.add(entry.getKey(), entry.getValue()));
- return target;
- }
-
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
index 6b7860c4..3ac6b2c6 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
@@ -18,11 +18,17 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
/**
@@ -32,63 +38,106 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.Immutabl
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
public class CloudConfigParser {
-
private static final String DMAAP_SECURITY_TRUST_STORE_PATH = "dmaap.security.trustStorePath";
private static final String DMAAP_SECURITY_TRUST_STORE_PASS_PATH = "dmaap.security.trustStorePasswordPath";
private static final String DMAAP_SECURITY_KEY_STORE_PATH = "dmaap.security.keyStorePath";
private static final String DMAAP_SECURITY_KEY_STORE_PASS_PATH = "dmaap.security.keyStorePasswordPath";
private static final String DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH = "dmaap.security.enableDmaapCertAuth";
- private final JsonObject jsonObject;
+ private final JsonObject serviceConfigurationRoot;
+ private final JsonObject dmaapConfigurationRoot;
- CloudConfigParser(JsonObject jsonObject) {
- this.jsonObject = jsonObject;
+ public CloudConfigParser(JsonObject serviceConfigurationRoot, JsonObject dmaapConfigurationRoot) {
+ this.serviceConfigurationRoot = serviceConfigurationRoot;
+ this.dmaapConfigurationRoot = dmaapConfigurationRoot;
}
- DmaapPublisherConfiguration getDmaapPublisherConfig() {
- return new ImmutableDmaapPublisherConfiguration.Builder()
- .dmaapTopicName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapTopicName").getAsString())
- .dmaapUserPassword(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserPassword").getAsString())
- .dmaapPortNumber(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapPortNumber").getAsInt())
- .dmaapProtocol(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapProtocol").getAsString())
- .dmaapContentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString())
- .dmaapHostName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapHostName").getAsString())
- .dmaapUserName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserName").getAsString())
- .trustStorePath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PATH).getAsString())
- .trustStorePasswordPath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PASS_PATH).getAsString())
- .keyStorePath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PATH).getAsString())
- .keyStorePasswordPath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PASS_PATH).getAsString())
- .enableDmaapCertAuth(jsonObject.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
- .build();
+ public Map<String, PublisherConfiguration> getDmaapPublisherConfig() throws DatafileTaskException {
+ Iterator<JsonElement> producerCfgs =
+ toArray(serviceConfigurationRoot.get("dmaap.dmaapProducerConfiguration")).iterator();
+
+ Map<String, PublisherConfiguration> result = new HashMap<>();
+
+ while (producerCfgs.hasNext()) {
+ JsonObject producerCfg = producerCfgs.next().getAsJsonObject();
+ String feedName = getAsString(producerCfg, "feedName");
+ JsonObject feedConfig = getFeedConfig(feedName);
+
+ PublisherConfiguration cfg = ImmutablePublisherConfiguration.builder() //
+ .publishUrl(getAsString(feedConfig, "publish_url")) //
+ .passWord(getAsString(feedConfig, "password")) //
+ .userName(getAsString(feedConfig, "username")) //
+ .trustStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PATH)) //
+ .trustStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) //
+ .keyStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PATH)) //
+ .keyStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) //
+ .enableDmaapCertAuth(
+ get(serviceConfigurationRoot, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
+ .changeIdentifier(getAsString(producerCfg, "changeIdentifier")) //
+ .logUrl(getAsString(feedConfig, "log_url")) //
+ .build();
+
+ result.put(cfg.changeIdentifier(), cfg);
+ }
+ return result;
}
- DmaapConsumerConfiguration getDmaapConsumerConfig() {
- return new ImmutableDmaapConsumerConfiguration.Builder()
- .timeoutMs(jsonObject.get("dmaap.dmaapConsumerConfiguration.timeoutMs").getAsInt())
- .dmaapHostName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapHostName").getAsString())
- .dmaapUserName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserName").getAsString())
- .dmaapUserPassword(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserPassword").getAsString())
- .dmaapTopicName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapTopicName").getAsString())
- .dmaapPortNumber(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapPortNumber").getAsInt())
- .dmaapContentType(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapContentType").getAsString())
- .messageLimit(jsonObject.get("dmaap.dmaapConsumerConfiguration.messageLimit").getAsInt())
- .dmaapProtocol(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapProtocol").getAsString())
- .consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString())
- .consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString())
- .trustStorePath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PATH).getAsString())
- .trustStorePasswordPath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PASS_PATH).getAsString())
- .keyStorePath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PATH).getAsString())
- .keyStorePasswordPath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PASS_PATH).getAsString())
- .enableDmaapCertAuth(jsonObject.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
+ public ConsumerConfiguration getDmaapConsumerConfig() throws DatafileTaskException {
+ JsonObject consumerCfg = serviceConfigurationRoot.get("streams_subscribes").getAsJsonObject();
+ Set<Entry<String, JsonElement>> topics = consumerCfg.entrySet();
+ if (topics.size() != 1) {
+ throw new DatafileTaskException("Invalid configuration, number oftopic must be one, config: " + topics);
+ }
+ JsonObject topic = topics.iterator().next().getValue().getAsJsonObject();
+ JsonObject dmaapInfo = get(topic, "dmmap_info").getAsJsonObject();
+ String topicUrl = getAsString(dmaapInfo, "topic_url");
+
+ return ImmutableConsumerConfiguration.builder().topicUrl(topicUrl)
+ .trustStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PATH))
+ .trustStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PASS_PATH))
+ .keyStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PATH))
+ .keyStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PASS_PATH))
+ .enableDmaapCertAuth(
+ get(serviceConfigurationRoot, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
.build();
}
- FtpesConfig getFtpesConfig() {
+ public FtpesConfig getFtpesConfig() throws DatafileTaskException {
return new ImmutableFtpesConfig.Builder() //
- .keyCert(jsonObject.get("dmaap.ftpesConfig.keyCert").getAsString())
- .keyPassword(jsonObject.get("dmaap.ftpesConfig.keyPassword").getAsString())
- .trustedCa(jsonObject.get("dmaap.ftpesConfig.trustedCa").getAsString())
- .trustedCaPassword(jsonObject.get("dmaap.ftpesConfig.trustedCaPassword").getAsString()) //
+ .keyCert(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyCert"))
+ .keyPassword(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyPassword"))
+ .trustedCa(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.trustedCa"))
+ .trustedCaPassword(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.trustedCaPassword")) //
.build();
}
+
+ private static JsonElement get(JsonObject obj, String memberName) throws DatafileTaskException {
+ JsonElement elem = obj.get(memberName);
+ if (elem == null) {
+ throw new DatafileTaskException("Could not find member: " + memberName + " in: " + obj);
+ }
+ return elem;
+ }
+
+ private static String getAsString(JsonObject obj, String memberName) throws DatafileTaskException {
+ return get(obj, memberName).getAsString();
+ }
+
+ private JsonObject getFeedConfig(String feedName) throws DatafileTaskException {
+ JsonElement elem = dmaapConfigurationRoot.get(feedName);
+ if (elem == null) {
+ elem = get(serviceConfigurationRoot, feedName); // Fallback, try to find it under
+ // serviceConfigurationRoot
+ }
+ return elem.getAsJsonObject();
+ }
+
+ private static JsonArray toArray(JsonElement obj) {
+ if (obj.isJsonArray()) {
+ return obj.getAsJsonArray();
+ }
+ JsonArray arr = new JsonArray();
+ arr.add(obj);
+ return arr;
+ }
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
deleted file mode 100644
index 597f525f..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*-
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.configuration;
-
-import com.google.gson.JsonObject;
-import java.util.Map;
-import java.util.Properties;
-import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.ReactiveCloudConfigurationProvider;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.Primary;
-import org.springframework.scheduling.annotation.EnableScheduling;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
-
-/**
- * Gets the DFC configuration from the ConfigBindingService/Consul and parses it to the configurations needed in DFC.
- *
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-@Configuration
-@ComponentScan("org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers")
-@EnableConfigurationProperties
-@EnableScheduling
-@Primary
-public class CloudConfiguration extends AppConfig {
- private static final Logger logger = LoggerFactory.getLogger(CloudConfiguration.class);
- private ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider;
- private DmaapPublisherConfiguration dmaapPublisherCloudConfiguration;
- private DmaapConsumerConfiguration dmaapConsumerCloudConfiguration;
- private FtpesConfig ftpesCloudConfiguration;
-
- @Value("#{systemEnvironment}")
- private Properties systemEnvironment;
-
- @Autowired
- public synchronized void setThreadPoolTaskScheduler(
- ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider) {
- this.reactiveCloudConfigurationProvider = reactiveCloudConfigurationProvider;
- }
-
- /**
- * Reads the cloud configuration.
- */
- public void runTask() {
- Map<String,String> context = MappedDiagnosticContext.initializeTraceContext();
- EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context) //
- .subscribeOn(Schedulers.parallel()) //
- .flatMap(reactiveCloudConfigurationProvider::callForServiceConfigurationReactive) //
- .flatMap(this::parseCloudConfig) //
- .subscribe(null, this::onError, this::onComplete);
- }
-
- private void onComplete() {
- logger.trace("Configuration updated");
- }
-
- private void onError(Throwable throwable) {
- logger.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable);
- }
-
- private synchronized Mono<CloudConfiguration> parseCloudConfig(JsonObject jsonObject) {
- logger.info("Received application configuration: {}", jsonObject);
- CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject);
- dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig();
- dmaapConsumerCloudConfiguration = cloudConfigParser.getDmaapConsumerConfig();
- ftpesCloudConfiguration = cloudConfigParser.getFtpesConfig();
- return Mono.just(this);
- }
-
- @Override
- public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
- return dmaapPublisherCloudConfiguration != null ? dmaapPublisherCloudConfiguration
- : super.getDmaapPublisherConfiguration();
- }
-
- @Override
- public synchronized DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
- return dmaapConsumerCloudConfiguration != null ? dmaapConsumerCloudConfiguration
- : super.getDmaapConsumerConfiguration();
- }
-
- @Override
- public synchronized FtpesConfig getFtpesConfiguration() {
- return ftpesCloudConfiguration != null ? ftpesCloudConfiguration : super.getFtpesConfiguration();
- }
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java
new file mode 100644
index 00000000..fc9ab204
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java
@@ -0,0 +1,105 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018,2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.configuration;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
+
+@Value.Immutable
+@Value.Style(redactedMask = "####")
+@Gson.TypeAdapters
+public abstract class ConsumerConfiguration {
+ @Value.Redacted
+ public abstract String topicUrl();
+
+ public abstract String trustStorePath();
+
+ public abstract String trustStorePasswordPath();
+
+ public abstract String keyStorePath();
+
+ public abstract String keyStorePasswordPath();
+
+ public abstract Boolean enableDmaapCertAuth();
+
+ public DmaapConsumerConfiguration toDmaap() throws DatafileTaskException {
+ try {
+ URL url = new URL(topicUrl());
+ String passwd = "";
+ String userName = "";
+ if (url.getUserInfo() != null) {
+ String[] userInfo = url.getUserInfo().split(":");
+ userName = userInfo[0];
+ passwd = userInfo[1];
+ }
+ String urlPath = url.getPath();
+ DmaapConsumerUrlPath path = parseDmaapUrlPath(urlPath);
+
+ return new ImmutableDmaapConsumerConfiguration.Builder() //
+ .dmaapContentType("application/json") //
+ .dmaapPortNumber(url.getPort()) //
+ .dmaapHostName(url.getHost()) //
+ .dmaapTopicName(path.dmaapTopicName) //
+ .dmaapProtocol(url.getProtocol()) //
+ .dmaapUserName(userName) //
+ .dmaapUserPassword(passwd) //
+ .trustStorePath(this.trustStorePath()) //
+ .trustStorePasswordPath(this.trustStorePasswordPath()) //
+ .keyStorePath(this.keyStorePath()) //
+ .keyStorePasswordPath(this.keyStorePasswordPath()) //
+ .enableDmaapCertAuth(this.enableDmaapCertAuth()) //
+ .consumerId(path.consumerId) //
+ .consumerGroup(path.consumerGroup) //
+ .timeoutMs(-1) //
+ .messageLimit(-1) //
+ .build();
+ } catch (MalformedURLException e) {
+ throw new DatafileTaskException("Could not parse the URL", e);
+ }
+ }
+
+ private class DmaapConsumerUrlPath {
+ final String dmaapTopicName;
+ final String consumerGroup;
+ final String consumerId;
+
+ DmaapConsumerUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) {
+ this.dmaapTopicName = dmaapTopicName;
+ this.consumerGroup = consumerGroup;
+ this.consumerId = consumerId;
+ }
+ }
+
+ private DmaapConsumerUrlPath parseDmaapUrlPath(String urlPath) throws DatafileTaskException {
+ String[] tokens = urlPath.split("/"); // UrlPath: /events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12
+ if (tokens.length != 5) {
+ throw new DatafileTaskException("The path has incorrect syntax: " + urlPath);
+ }
+
+ final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // ex. // /events/unauthenticated.VES_NOTIFICATION_OUTPUT
+ final String consumerGroup = tokens[3]; // ex. OpenDcae-c12
+ final String consumerId = tokens[4]; // ex. C12
+ return new DmaapConsumerUrlPath(dmaapTopicName, consumerGroup, consumerId);
+ }
+
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
index 71003f80..62af92a8 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
@@ -19,12 +19,14 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+
import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
+
import reactor.core.publisher.Mono;
/**
@@ -53,7 +55,7 @@ class EnvironmentProcessor {
} catch (EnvironmentLoaderException e) {
return Mono.error(e);
}
- logger.info("Evaluated environment system variables {}", envProperties);
+ logger.trace("Evaluated environment system variables {}", envProperties);
return Mono.just(envProperties);
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java
index 3f029359..844699eb 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java
@@ -21,6 +21,7 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
import java.io.Serializable;
+
import org.immutables.gson.Gson;
import org.immutables.value.Value;
import org.springframework.stereotype.Component;
@@ -28,7 +29,7 @@ import org.springframework.stereotype.Component;
@Component
@Value.Immutable
-@Value.Style(builder = "new")
+@Value.Style(builder = "new", redactedMask = "####")
@Gson.TypeAdapters
public abstract class FtpesConfig implements Serializable {
@@ -38,11 +39,13 @@ public abstract class FtpesConfig implements Serializable {
public abstract String keyCert();
@Value.Parameter
+ @Value.Redacted
public abstract String keyPassword();
@Value.Parameter
public abstract String trustedCa();
@Value.Parameter
+ @Value.Redacted
public abstract String trustedCaPassword();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java
new file mode 100644
index 00000000..5576ed26
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java
@@ -0,0 +1,74 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018,2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.configuration;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+
+
+@Value.Immutable
+@Value.Style(redactedMask = "####")
+@Gson.TypeAdapters
+public interface PublisherConfiguration {
+
+ String publishUrl();
+
+ String logUrl();
+
+ String userName();
+
+ @Value.Redacted
+ String passWord();
+
+ String trustStorePath();
+
+ String trustStorePasswordPath();
+
+ String keyStorePath();
+
+ String keyStorePasswordPath();
+
+ Boolean enableDmaapCertAuth();
+
+ String changeIdentifier();
+
+ default DmaapPublisherConfiguration toDmaap() throws MalformedURLException {
+ URL url = new URL(publishUrl());
+ String urlPath = url.getPath();
+
+ return new ImmutableDmaapPublisherConfiguration.Builder() //
+ .dmaapContentType("application/octet-stream") //
+ .dmaapPortNumber(url.getPort()) //
+ .dmaapHostName(url.getHost()) //
+ .dmaapTopicName(urlPath) //
+ .dmaapProtocol(url.getProtocol()) //
+ .dmaapUserName(this.userName()) //
+ .dmaapUserPassword(this.passWord()) //
+ .trustStorePath(this.trustStorePath()) //
+ .trustStorePasswordPath(this.trustStorePasswordPath()) //
+ .keyStorePath(this.keyStorePath()) //
+ .keyStorePasswordPath(this.keyStorePasswordPath()) //
+ .enableDmaapCertAuth(this.enableDmaapCertAuth()) //
+ .build();
+ }
+
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
index b78e4ae5..5835de1a 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
@@ -16,7 +16,6 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
-import io.swagger.annotations.ApiOperation;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
@@ -24,7 +23,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
+
import javax.annotation.PostConstruct;
+
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
import org.slf4j.Logger;
@@ -36,6 +37,8 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
+
+import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
/**
@@ -49,7 +52,6 @@ import reactor.core.publisher.Mono;
public class SchedulerConfig {
private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = Duration.ofSeconds(15);
- private static final Duration SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = Duration.ofMinutes(5);
private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE = Duration.ofHours(1);
private static final Logger logger = LoggerFactory.getLogger(SchedulerConfig.class);
private static List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
@@ -57,21 +59,21 @@ public class SchedulerConfig {
private final TaskScheduler taskScheduler;
private final ScheduledTasks scheduledTask;
- private final CloudConfiguration cloudConfiguration;
+ private final AppConfig configuration;
/**
* Constructor.
*
* @param taskScheduler The scheduler used to schedule the tasks.
* @param scheduledTasks The scheduler that will actually handle the tasks.
- * @param cloudConfiguration The DFC configuration.
+ * @param configuration The DFC configuration.
*/
@Autowired
public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTasks,
- CloudConfiguration cloudConfiguration) {
+ AppConfig configuration) {
this.taskScheduler = taskScheduler;
this.scheduledTask = scheduledTasks;
- this.cloudConfiguration = cloudConfiguration;
+ this.configuration = configuration;
}
/**
@@ -83,6 +85,7 @@ public class SchedulerConfig {
public synchronized Mono<ResponseEntity<String>> getResponseFromCancellationOfTasks() {
scheduledFutureList.forEach(x -> x.cancel(false));
scheduledFutureList.clear();
+ configuration.stop();
MDC.setContextMap(contextMap);
logger.info("Stopped Datafile workflow");
MDC.clear();
@@ -99,12 +102,11 @@ public class SchedulerConfig {
public synchronized boolean tryToStartTask() {
contextMap = MappedDiagnosticContext.initializeTraceContext();
logger.info("Start scheduling Datafile workflow");
+ configuration.initialize();
+
if (scheduledFutureList.isEmpty()) {
- scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(cloudConfiguration::runTask,
- Instant.now(), SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY));
- scheduledFutureList.add(
- taskScheduler.scheduleWithFixedDelay(scheduledTask::executeDatafileMainTask,
- SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
+ scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(scheduledTask::executeDatafileMainTask,
+ SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
scheduledFutureList
.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()),
SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE));
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java
index 71242265..5a78261a 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java
@@ -62,7 +62,7 @@ public class SwaggerConfig extends WebMvcConfigurationSupport {
.build();
}
- private ApiInfo apiInfo() {
+ private static ApiInfo apiInfo() {
return new ApiInfoBuilder() //
.title(API_TITLE) //
.description(DESCRIPTION) //
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java
index cbd67297..847ca46e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java
@@ -41,7 +41,7 @@ public class TomcatHttpConfig {
return tomcat;
}
- private Connector getHttpConnector() {
+ private static Connector getHttpConnector() {
Connector connector = new Connector(TomcatServletWebServerFactory.DEFAULT_PROTOCOL);
connector.setScheme("http");
connector.setPort(8100);
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
index 4716fa87..791f0cf1 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
@@ -71,7 +71,7 @@ public class ScheduleController {
public Mono<ResponseEntity<String>> startTasks() {
return Mono.fromSupplier(schedulerConfig::tryToStartTask) //
- .map(this::createStartTaskResponse);
+ .map(ScheduleController::createStartTaskResponse);
}
/**
@@ -90,7 +90,7 @@ public class ScheduleController {
}
@ApiOperation(value = "Sends success or error response on starting task execution")
- private ResponseEntity<String> createStartTaskResponse(boolean wasScheduled) {
+ private static ResponseEntity<String> createStartTaskResponse(boolean wasScheduled) {
if (wasScheduled) {
return new ResponseEntity<>("Datafile Service has been started!", HttpStatus.CREATED);
} else {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
index 4c49dd8a..72623db2 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
@@ -17,6 +17,7 @@
package org.onap.dcaegen2.collectors.datafile.ftp;
import java.util.Optional;
+
import org.immutables.value.Value;
/**
@@ -26,11 +27,13 @@ import org.immutables.value.Value;
*
*/
@Value.Immutable
+@Value.Style(redactedMask = "####")
public interface FileServerData {
public String serverAddress();
public String userId();
+ @Value.Redacted
public String password();
public Optional<Integer> port();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
index c78ae3a3..bb3016ce 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
@@ -128,7 +128,7 @@ public class FtpsClient implements FileCollectClient {
logger.trace("collectFile fetched: {}", localFileName);
}
- private int getPort(Optional<Integer> port) {
+ private static int getPort(Optional<Integer> port) {
return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT;
}
@@ -180,7 +180,7 @@ public class FtpsClient implements FileCollectClient {
logger.warn("Local file {} already created", localFileName);
}
OutputStream output = new FileOutputStream(localFile);
- logger.debug("File {} opened xNF", localFileName);
+ logger.trace("File {} opened xNF", localFileName);
return output;
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
index 333be92a..bdaf6d4c 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
@@ -55,7 +55,7 @@ public class SftpClient implements FileCollectClient {
try {
sftpChannel.get(remoteFile, localFile.toString());
- logger.debug("File {} Download Successfull from xNF", localFile.getFileName());
+ logger.trace("File {} Download Successfull from xNF", localFile.getFileName());
} catch (SftpException e) {
boolean retry = e.id != ChannelSftp.SSH_FX_NO_SUCH_FILE && e.id != ChannelSftp.SSH_FX_PERMISSION_DENIED && e.id != ChannelSftp.SSH_FX_OP_UNSUPPORTED;
throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e, retry);
@@ -90,11 +90,11 @@ public class SftpClient implements FileCollectClient {
}
}
- private int getPort(Optional<Integer> port) {
+ private static int getPort(Optional<Integer> port) {
return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT;
}
- private Session setUpSession(FileServerData fileServerData) throws JSchException {
+ private static Session setUpSession(FileServerData fileServerData) throws JSchException {
JSch jsch = new JSch();
Session newSession =
@@ -105,7 +105,7 @@ public class SftpClient implements FileCollectClient {
return newSession;
}
- private ChannelSftp getChannel(Session session) throws JSchException {
+ private static ChannelSftp getChannel(Session session) throws JSchException {
Channel channel = session.openChannel("sftp");
channel.connect();
return (ChannelSftp) channel;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
index 0a6b669c..36aae949 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
@@ -38,6 +38,7 @@ import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
*/
@Value.Immutable
@Gson.TypeAdapters
+@Value.Style(redactedMask = "####")
public abstract class FileData {
public static final String DATAFILE_TMPDIR = "/tmp/onap_datafile/";
@@ -53,6 +54,7 @@ public abstract class FileData {
*
* @return the URL to use to fetch the file from the PNF
*/
+ @Value.Redacted
public abstract String location();
/**
@@ -123,7 +125,7 @@ public abstract class FileData {
* @return An <code>Optional</code> containing a String array with the user name and password if given, or an empty
* <code>Optional</code> if not given.
*/
- private Optional<String[]> getUserNameAndPasswordIfGiven(String userInfoString) {
+ private static Optional<String[]> getUserNameAndPasswordIfGiven(String userInfoString) {
if (userInfoString != null) {
String[] userAndPassword = userInfoString.split(":");
if (userAndPassword.length == 2) {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java
index 63ed0daa..5b8c015e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java
@@ -33,6 +33,7 @@ import org.immutables.value.Value;
@Value.Immutable
@Gson.TypeAdapters
+@Value.Style(redactedMask = "####")
public interface FilePublishInformation {
@SerializedName("productName")
@@ -54,6 +55,7 @@ public interface FilePublishInformation {
String getTimeZoneOffset();
@SerializedName("location")
+ @Value.Redacted
String getLocation();
@SerializedName("compression")
@@ -70,4 +72,6 @@ public interface FilePublishInformation {
String getName();
Map<String, String> getContext();
+
+ String getChangeIdentifier();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java
index 7081d1ac..b8df125b 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java
@@ -31,12 +31,12 @@ import java.util.Set;
*/
public abstract class JsonSerializer {
+ private JsonSerializer() {}
- private static Gson gson =
- new GsonBuilder() //
- .serializeNulls() //
- .addSerializationExclusionStrategy(new FilePublishInformationExclusionStrategy()) //
- .create(); //
+ private static Gson gson = new GsonBuilder() //
+ .serializeNulls() //
+ .addSerializationExclusionStrategy(new FilePublishInformationExclusionStrategy()) //
+ .create(); //
/**
* Serializes a <code>filePublishInformation</code>.
@@ -56,9 +56,10 @@ public abstract class JsonSerializer {
private final Set<String> inclusions =
Sets.newHashSet("productName", "vendorName", "lastEpochMicrosec", "sourceName", "startEpochMicrosec",
"timeZoneOffset", "location", "compression", "fileFormatType", "fileFormatVersion");
+
@Override
- public boolean shouldSkipField(FieldAttributes f) {
- return !inclusions.contains(f.getName());
+ public boolean shouldSkipField(FieldAttributes fieldAttributes) {
+ return !inclusions.contains(fieldAttributes.getName());
}
@Override
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index 3a3eb3aa..470c4e73 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -22,11 +22,13 @@ import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
+
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.StreamSupport;
+
import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
@@ -37,6 +39,7 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -70,7 +73,6 @@ public class JsonMessageParser {
private static final String FILE_FORMAT_VERSION = "fileFormatVersion";
private static final String FILE_READY_CHANGE_TYPE = "FileReady";
- private static final String FILE_READY_CHANGE_IDENTIFIER = "PM_MEAS_FILES";
/**
* The data types available in the event name.
@@ -92,7 +94,7 @@ public class JsonMessageParser {
* @return a <code>Flux</code> containing messages.
*/
public Flux<FileReadyMessage> getMessagesFromJson(Mono<String> rawMessage) {
- return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createMessageData);
+ return rawMessage.flatMapMany(JsonMessageParser::getJsonParserMessage).flatMap(this::createMessageData);
}
Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
@@ -123,18 +125,17 @@ public class JsonMessageParser {
: getMessagesFromJsonArray(jsonElement);
}
- private Mono<JsonElement> getJsonParserMessage(String message) {
- logger.trace("original message from message router: {}", message);
+ private static Mono<JsonElement> getJsonParserMessage(String message) {
return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
}
- private Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) {
+ private static Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) {
return jsonObject.flatMap(monoJsonP -> containsNotificationFields(monoJsonP) ? transformMessages(monoJsonP)
: logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject));
}
- private Mono<FileReadyMessage> transformMessages(JsonObject message) {
+ private static Mono<FileReadyMessage> transformMessages(JsonObject message) {
Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message);
if (optionalMessageMetaData.isPresent()) {
MessageMetaData messageMetaData = optionalMessageMetaData.get();
@@ -159,7 +160,7 @@ public class JsonMessageParser {
}
- private Optional<MessageMetaData> getMessageMetaData(JsonObject message) {
+ private static Optional<MessageMetaData> getMessageMetaData(JsonObject message) {
List<String> missingValues = new ArrayList<>();
JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER);
String eventName = getValueFromJson(commonEventHeader, EVENT_NAME, missingValues);
@@ -182,15 +183,15 @@ public class JsonMessageParser {
.changeIdentifier(changeIdentifier) //
.changeType(changeType) //
.build();
- if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) {
+ if (missingValues.isEmpty() && isChangeTypeCorrect(changeType)) {
return Optional.of(messageMetaData);
} else {
String errorMessage = "VES event parsing.";
if (!missingValues.isEmpty()) {
errorMessage += " Missing data: " + missingValues;
}
- if (!isChangeIdentifierCorrect(changeIdentifier) || !isChangeTypeCorrect(changeType)) {
- errorMessage += " Change identifier or change type is wrong.";
+ if (!isChangeTypeCorrect(changeType)) {
+ errorMessage += " Change type is wrong: " + changeType + " expected: " + FILE_READY_CHANGE_TYPE;
}
errorMessage += " Message: {}";
logger.error(errorMessage, message);
@@ -198,15 +199,12 @@ public class JsonMessageParser {
}
}
- private boolean isChangeTypeCorrect(String changeType) {
+ private static boolean isChangeTypeCorrect(String changeType) {
return FILE_READY_CHANGE_TYPE.equals(changeType);
}
- private boolean isChangeIdentifierCorrect(String changeIdentifier) {
- return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier);
- }
-
- private List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields, MessageMetaData messageMetaData) {
+ private static List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields,
+ MessageMetaData messageMetaData) {
List<FileData> res = new ArrayList<>();
for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
@@ -219,7 +217,7 @@ public class JsonMessageParser {
return res;
}
- private Optional<FileData> getFileDataFromJson(JsonObject fileInfo, MessageMetaData messageMetaData) {
+ private static Optional<FileData> getFileDataFromJson(JsonObject fileInfo, MessageMetaData messageMetaData) {
logger.trace("starting to getFileDataFromJson!");
List<String> missingValues = new ArrayList<>();
@@ -250,15 +248,17 @@ public class JsonMessageParser {
}
/**
- * Gets data from the event name. Defined as: {DomainAbbreviation}_{productName}-{vendorName}_{Description},
- * example: Noti_RnNode-Ericsson_FileReady
+ * Gets data from the event name. Defined as:
+ * {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example:
+ * Noti_RnNode-Ericsson_FileReady
*
* @param dataType The type of data to get, {@link DmaapConsumerJsonParser.EventNameDataType}.
* @param eventName The event name to get the data from.
* @param missingValues List of missing values. The dataType will be added if missing.
* @return String of data from event name
*/
- private String getDataFromEventName(EventNameDataType dataType, String eventName, List<String> missingValues) {
+ private static String getDataFromEventName(EventNameDataType dataType, String eventName,
+ List<String> missingValues) {
String[] eventArray = eventName.split("_|-");
if (eventArray.length >= 4) {
return eventArray[dataType.index];
@@ -269,7 +269,7 @@ public class JsonMessageParser {
return "";
}
- private String getValueFromJson(JsonObject jsonObject, String jsonKey, List<String> missingValues) {
+ private static String getValueFromJson(JsonObject jsonObject, String jsonKey, List<String> missingValues) {
if (jsonObject.has(jsonKey)) {
return jsonObject.get(jsonKey).getAsString();
} else {
@@ -278,11 +278,11 @@ public class JsonMessageParser {
}
}
- private boolean containsNotificationFields(JsonObject jsonObject) {
+ private static boolean containsNotificationFields(JsonObject jsonObject) {
return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS);
}
- private Flux<FileReadyMessage> logErrorAndReturnEmptyMessageFlux(String errorMessage) {
+ private static Flux<FileReadyMessage> logErrorAndReturnEmptyMessageFlux(String errorMessage) {
logger.error(errorMessage);
return Flux.empty();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
index 2b3a0ef6..ff267815 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
@@ -67,7 +67,7 @@ public class PublishedFileCache {
return publishedFiles.size();
}
- private boolean isCachedPublishedFileOutdated(Instant now, Instant then) {
+ private static boolean isCachedPublishedFileOutdated(Instant now, Instant then) {
final int timeToKeepInfoInSeconds = 60 * 60 * 24;
return now.getEpochSecond() - then.getEpochSecond() > timeToKeepInfoInSeconds;
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java
index 8d433827..198c1bf1 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java
@@ -27,7 +27,9 @@ import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.Future;
+
import javax.net.ssl.SSLContext;
+
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
@@ -44,8 +46,6 @@ import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
-import org.springframework.web.util.DefaultUriBuilderFactory;
-import org.springframework.web.util.UriBuilder;
/**
* Client used to send requests to DataRouter.
@@ -139,22 +139,8 @@ public class DmaapProducerHttpClient {
request.addHeader("Authorization", "Basic " + base64Creds);
}
- /**
- * Gets a <code>UriBuilder</code> containing the base URI needed talk to DataRouter. Specific parts can then be
- * added to the URI by the user.
- *
- * @return a <code>UriBuilder</code> containing the base URI needed talk to DataRouter.
- */
- public UriBuilder getBaseUri() {
- return new DefaultUriBuilderFactory().builder() //
- .scheme(configuration.dmaapProtocol()) //
- .host(configuration.dmaapHostName()) //
- .port(configuration.dmaapPortNumber());
- }
-
private CloseableHttpAsyncClient createWebClient(boolean expectRedirect, Duration requestTimeout,
- Map<String, String> contextMap)
- throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
+ Map<String, String> contextMap) throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
SSLContext sslContext =
new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
index e50ef580..f1d33454 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
@@ -21,6 +21,7 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.service.DmaapWebClient;
import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
@@ -29,6 +30,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consume
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.reactive.function.client.WebClient;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -43,7 +45,7 @@ public class DMaaPMessageConsumer {
private final JsonMessageParser jsonMessageParser;
private final DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
- public DMaaPMessageConsumer(AppConfig datafileAppConfig) {
+ public DMaaPMessageConsumer(AppConfig datafileAppConfig) throws DatafileTaskException {
this.jsonMessageParser = new JsonMessageParser();
this.dmaaPConsumerReactiveHttpClient = createHttpClient(datafileAppConfig);
}
@@ -69,8 +71,9 @@ public class DMaaPMessageConsumer {
return jsonMessageParser.getMessagesFromJson(message);
}
- private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig) {
- DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration();
+ private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig)
+ throws DatafileTaskException {
+ DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration().toDmaap();
WebClient client = new DmaapWebClient().fromConfiguration(config).build();
return new DMaaPConsumerReactiveHttpClient(config, client);
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index e5dd01e9..1d6baa65 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -25,6 +25,7 @@ import com.google.gson.JsonParser;
import java.io.IOException;
import java.io.InputStream;
+import java.net.MalformedURLException;
import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
@@ -34,6 +35,8 @@ import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.JsonSerializer;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
@@ -46,6 +49,7 @@ import org.slf4j.MDC;
import org.springframework.core.io.FileSystemResource;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
+import org.springframework.web.util.DefaultUriBuilderFactory;
import reactor.core.publisher.Mono;
@@ -58,12 +62,9 @@ import reactor.core.publisher.Mono;
public class DataRouterPublisher {
private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
private static final String CONTENT_TYPE = "application/octet-stream";
- private static final String PUBLISH_TOPIC = "publish";
- private static final String DEFAULT_FEED_ID = "1";
private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class);
private final AppConfig datafileAppConfig;
- private DmaapProducerHttpClient dmaapProducerReactiveHttpClient;
public DataRouterPublisher(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
@@ -80,7 +81,6 @@ public class DataRouterPublisher {
public Mono<FilePublishInformation> publishFile(FilePublishInformation publishInfo, long numRetries,
Duration firstBackoff) {
MDC.setContextMap(publishInfo.getContext());
- dmaapProducerReactiveHttpClient = resolveClient();
return Mono.just(publishInfo) //
.cache() //
.flatMap(this::publishFile) //
@@ -92,13 +92,14 @@ public class DataRouterPublisher {
MDC.setContextMap(publishInfo.getContext());
logger.trace("Entering publishFile with {}", publishInfo);
try {
+ DmaapProducerHttpClient dmaapProducerHttpClient = resolveClient(publishInfo.getChangeIdentifier());
HttpPut put = new HttpPut();
prepareHead(publishInfo, put);
prepareBody(publishInfo, put);
- dmaapProducerReactiveHttpClient.addUserCredentialsToHead(put);
+ dmaapProducerHttpClient.addUserCredentialsToHead(put);
HttpResponse response =
- dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext());
+ dmaapProducerHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext());
logger.trace("{}", response);
return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
} catch (Exception e) {
@@ -107,11 +108,18 @@ public class DataRouterPublisher {
}
}
- private void prepareHead(FilePublishInformation publishInfo, HttpPut put) {
+ private void prepareHead(FilePublishInformation publishInfo, HttpPut put) throws DatafileTaskException {
+
put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE);
JsonElement metaData = new JsonParser().parse(JsonSerializer.createJsonBodyForDataRouter(publishInfo));
put.addHeader(X_DMAAP_DR_META, metaData.toString());
- put.setURI(getPublishUri(publishInfo.getName()));
+ URI uri = new DefaultUriBuilderFactory(
+ datafileAppConfig.getPublisherConfiguration(publishInfo.getChangeIdentifier()).publishUrl()) //
+ .builder() //
+ .pathSegment(publishInfo.getName()) //
+ .build();
+ put.setURI(uri);
+
MappedDiagnosticContext.appendTraceInfo(put);
}
@@ -122,14 +130,7 @@ public class DataRouterPublisher {
}
}
- private URI getPublishUri(String fileName) {
- return dmaapProducerReactiveHttpClient.getBaseUri() //
- .pathSegment(PUBLISH_TOPIC) //
- .pathSegment(DEFAULT_FEED_ID) //
- .pathSegment(fileName).build();
- }
-
- private Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation publishInfo) {
+ private static Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation publishInfo) {
MDC.setContextMap(publishInfo.getContext());
if (HttpUtils.isSuccessfulResponseCode(response.value())) {
logger.trace("Publish to DR successful!");
@@ -145,11 +146,17 @@ public class DataRouterPublisher {
return realResource.getInputStream();
}
- DmaapPublisherConfiguration resolveConfiguration() {
- return datafileAppConfig.getDmaapPublisherConfiguration();
+ PublisherConfiguration resolveConfiguration(String changeIdentifer) throws DatafileTaskException {
+ return datafileAppConfig.getPublisherConfiguration(changeIdentifer);
}
- DmaapProducerHttpClient resolveClient() {
- return new DmaapProducerHttpClient(resolveConfiguration());
+ DmaapProducerHttpClient resolveClient(String changeIdentifier) throws DatafileTaskException {
+ try {
+ DmaapPublisherConfiguration cfg = resolveConfiguration(changeIdentifier).toDmaap();
+ return new DmaapProducerHttpClient(cfg);
+ } catch (MalformedURLException e) {
+ throw new DatafileTaskException("Cannot resolve producer client", e);
+ }
+
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index 0c62795e..6ddcb541 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -74,12 +74,12 @@ public class FileCollector {
return Mono.just(fileData) //
.cache() //
- .flatMap(fd -> collectFile(fileData, contextMap)) //
+ .flatMap(fd -> tryCollectFile(fileData, contextMap)) //
.retryBackoff(numRetries, firstBackoff) //
- .flatMap(this::checkCollectedFile);
+ .flatMap(FileCollector::checkCollectedFile);
}
- private Mono<FilePublishInformation> checkCollectedFile(Optional<FilePublishInformation> info) {
+ private static Mono<FilePublishInformation> checkCollectedFile(Optional<FilePublishInformation> info) {
if (info.isPresent()) {
return Mono.just(info.get());
} else {
@@ -88,7 +88,7 @@ public class FileCollector {
}
}
- private Mono<Optional<FilePublishInformation>> collectFile(FileData fileData, Map<String, String> context) {
+ private Mono<Optional<FilePublishInformation>> tryCollectFile(FileData fileData, Map<String, String> context) {
MDC.setContextMap(context);
logger.trace("starting to collectFile {}", fileData.name());
@@ -110,7 +110,7 @@ public class FileCollector {
}
} catch (Exception throwable) {
logger.warn("Failed to close ftp client: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
- throwable.toString());
+ throwable.toString(), throwable);
return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context)));
}
}
@@ -126,7 +126,7 @@ public class FileCollector {
}
}
- private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile,
+ private static FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile,
Map<String, String> context) {
String location = fileData.location();
MessageMetaData metaData = fileData.messageMetaData();
@@ -143,6 +143,7 @@ public class FileCollector {
.compression(fileData.compression()) //
.fileFormatType(fileData.fileFormatType()) //
.fileFormatVersion(fileData.fileFormatVersion()) //
+ .changeIdentifier(fileData.messageMetaData().changeIdentifier()) //
.context(context) //
.build();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
index e18da248..4d8d679d 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
@@ -21,18 +21,23 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
import java.io.InputStream;
+import java.net.MalformedURLException;
import java.net.URI;
+import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Map;
+
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -44,12 +49,9 @@ import org.slf4j.MDC;
*
*/
public class PublishedChecker {
- private static final String FEEDLOG_TOPIC = "feedlog";
- private static final String DEFAULT_FEED_ID = "1";
- private static final Duration WEB_CLIENT_TIMEOUT = Duration.ofSeconds(4);
+ private static final Duration WEB_CLIENT_TIMEOUT = Duration.ofSeconds(4);
private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
private final AppConfig appConfig;
/**
@@ -66,19 +68,24 @@ public class PublishedChecker {
*
* @param fileName the name of the file used when it is published.
*
- * @return <code>true</code> if the file has been published before, <code>false</code> otherwise.
+ * @return <code>true</code> if the file has been published before, <code>false</code>
+ * otherwise.
+ * @throws DatafileTaskException if the check fails
*/
- public boolean isFilePublished(String fileName, Map<String, String> contextMap) {
+ public boolean isFilePublished(String fileName, String changeIdentifier, Map<String, String> contextMap)
+ throws DatafileTaskException {
MDC.setContextMap(contextMap);
- DmaapProducerHttpClient producerClient = resolveClient();
+ PublisherConfiguration publisherConfig = resolveConfiguration(changeIdentifier);
+
+ DmaapProducerHttpClient producerClient = resolveClient(publisherConfig);
HttpGet getRequest = new HttpGet();
MappedDiagnosticContext.appendTraceInfo(getRequest);
- getRequest.setURI(getPublishedQueryUri(fileName, producerClient));
- producerClient.addUserCredentialsToHead(getRequest);
-
try {
+ getRequest.setURI(getPublishedQueryUri(fileName, publisherConfig));
+ producerClient.addUserCredentialsToHead(getRequest);
+
HttpResponse response = producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest,
WEB_CLIENT_TIMEOUT, contextMap);
@@ -95,20 +102,23 @@ public class PublishedChecker {
}
}
- private URI getPublishedQueryUri(String fileName, DmaapProducerHttpClient producerClient) {
- return producerClient.getBaseUri() //
- .pathSegment(FEEDLOG_TOPIC) //
- .pathSegment(DEFAULT_FEED_ID) //
- .queryParam("type", "pub") //
- .queryParam("filename", fileName) //
+ private static URI getPublishedQueryUri(String fileName, PublisherConfiguration config) throws URISyntaxException {
+ return new URIBuilder(config.logUrl()) //
+ .addParameter("type", "pub") //
+ .addParameter("filename", fileName) //
.build();
}
- protected DmaapPublisherConfiguration resolveConfiguration() {
- return appConfig.getDmaapPublisherConfiguration();
+ protected PublisherConfiguration resolveConfiguration(String changeIdentifier) throws DatafileTaskException {
+ return appConfig.getPublisherConfiguration(changeIdentifier);
}
- protected DmaapProducerHttpClient resolveClient() {
- return new DmaapProducerHttpClient(resolveConfiguration());
+ protected DmaapProducerHttpClient resolveClient(PublisherConfiguration publisherConfig)
+ throws DatafileTaskException {
+ try {
+ return new DmaapProducerHttpClient(publisherConfig.toDmaap());
+ } catch (MalformedURLException e) {
+ throw new DatafileTaskException("Cannot create published checker client", e);
+ }
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index b5fa0c24..bac52659 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
@@ -86,13 +87,16 @@ public class ScheduledTasks {
threadPoolQueueSize.get());
return;
}
+ if (this.applicationConfiguration.getDmaapConsumerConfiguration() == null) {
+ logger.warn("No configuration loaded, skipping polling for messages");
+ return;
+ }
currentNumberOfSubscriptions.incrementAndGet();
Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
logger.trace("Execution of tasks was registered");
- applicationConfiguration.loadConfigurationFromFile();
createMainTask(context) //
- .subscribe(this::onSuccess, //
+ .subscribe(ScheduledTasks::onSuccess, //
throwable -> {
onError(throwable, context);
currentNumberOfSubscriptions.decrementAndGet();
@@ -115,6 +119,7 @@ public class ScheduledTasks {
.flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) //
.doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
.flatMap(fileData -> createMdcContext(fileData, context)) //
+ .filter(this::isFeedConfigured) //
.filter(this::shouldBePublished) //
.flatMap(this::fetchFile, false, 1, 1) //
.flatMap(this::publishToDataRouter, false, 1, 1) //
@@ -124,13 +129,13 @@ public class ScheduledTasks {
}
private class FileDataWithContext {
- FileDataWithContext(FileData fileData, Map<String, String> context) {
+ public final FileData fileData;
+ public final Map<String, String> context;
+
+ public FileDataWithContext(FileData fileData, Map<String, String> context) {
this.fileData = fileData;
this.context = context;
}
-
- final FileData fileData;
- final Map<String, String> context;
}
/**
@@ -160,7 +165,7 @@ public class ScheduledTasks {
return this.threadPoolQueueSize.get();
}
- protected DMaaPMessageConsumer createConsumerTask() {
+ protected DMaaPMessageConsumer createConsumerTask() throws DatafileTaskException {
return new DMaaPMessageConsumer(this.applicationConfiguration);
}
@@ -172,17 +177,17 @@ public class ScheduledTasks {
return new DataRouterPublisher(applicationConfiguration);
}
- private void onComplete(Map<String, String> contextMap) {
+ private static void onComplete(Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
logger.trace("Datafile tasks have been completed");
}
- private synchronized void onSuccess(FilePublishInformation publishInfo) {
+ private static synchronized void onSuccess(FilePublishInformation publishInfo) {
MDC.setContextMap(publishInfo.getContext());
logger.info("Datafile file published {}", publishInfo.getInternalLocation());
}
- private void onError(Throwable throwable, Map<String, String> context) {
+ private static void onError(Throwable throwable, Map<String, String> context) {
MDC.setContextMap(context);
logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable.toString());
}
@@ -194,11 +199,26 @@ public class ScheduledTasks {
return Mono.just(pair);
}
+ private boolean isFeedConfigured(FileDataWithContext fileData) {
+ if (applicationConfiguration.isFeedConfigured(fileData.fileData.messageMetaData().changeIdentifier())) {
+ return true;
+ } else {
+ logger.info("No feed is configured for: {}, file ignored: {}",
+ fileData.fileData.messageMetaData().changeIdentifier(), fileData.fileData.name());
+ return false;
+ }
+ }
+
private boolean shouldBePublished(FileDataWithContext fileData) {
boolean result = false;
Path localFilePath = fileData.fileData.getLocalFilePath();
if (publishedFilesCache.put(localFilePath) == null) {
- result = !createPublishedChecker().isFilePublished(fileData.fileData.name(), fileData.context);
+ try {
+ result = !createPublishedChecker().isFilePublished(fileData.fileData.name(),
+ fileData.fileData.messageMetaData().changeIdentifier(), fileData.context);
+ } catch (DatafileTaskException e) {
+ logger.error("Cannot check if a file {} is published", fileData.fileData.name(), e);
+ }
}
if (!result) {
currentNumberOfTasks.decrementAndGet();
@@ -248,13 +268,19 @@ public class ScheduledTasks {
*/
private Flux<FileReadyMessage> fetchMoreFileReadyMessages() {
logger.info(
- "Consuming new file ready messages, current number of tasks: {}, published files: {}, number of subscrptions: {}",
+ "Consuming new file ready messages, current number of tasks: {}, published files: {}, "
+ + "number of subscriptions: {}",
getCurrentNumberOfTasks(), publishedFilesCache.size(), this.currentNumberOfSubscriptions.get());
Map<String, String> context = MDC.getCopyOfContextMap();
- return createConsumerTask() //
- .getMessageRouterResponse() //
- .onErrorResume(exception -> handleConsumeMessageFailure(exception, context));
+ try {
+ return createConsumerTask() //
+ .getMessageRouterResponse() //
+ .onErrorResume(exception -> handleConsumeMessageFailure(exception, context));
+ } catch (Exception e) {
+ logger.error("Could not create message consumer task", e);
+ return Flux.empty();
+ }
}
private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> context) {
@@ -264,7 +290,7 @@ public class ScheduledTasks {
return Flux.empty();
}
- private void deleteFile(Path localFile, Map<String, String> context) {
+ private static void deleteFile(Path localFile, Map<String, String> context) {
MDC.setContextMap(context);
logger.trace("Deleting file: {}", localFile);
try {
diff --git a/datafile-app-server/src/main/resources/datafile_endpoints.json b/datafile-app-server/src/main/resources/datafile_endpoints.json
deleted file mode 100644
index 8d45bc84..00000000
--- a/datafile-app-server/src/main/resources/datafile_endpoints.json
+++ /dev/null
@@ -1,37 +0,0 @@
-{
- "configs": {
- "dmaap": {
- "dmaapConsumerConfiguration": {
- "dmaapHostName": "localhost",
- "dmaapPortNumber": 2222,
- "dmaapTopicName": "/events/unauthenticated.VES_NOTIFICATION_OUTPUT",
- "dmaapProtocol": "http",
- "dmaapUserName": "",
- "dmaapUserPassword": "",
- "dmaapContentType": "application/json",
- "consumerId": "C12",
- "consumerGroup": "OpenDcae-c12",
- "timeoutMs": -1,
- "messageLimit": 1
- },
- "dmaapProducerConfiguration": {
- "dmaapHostName": "localhost",
- "dmaapPortNumber": 3907,
- "dmaapTopicName": "publish",
- "dmaapProtocol": "https",
- "dmaapUserName": "dradmin",
- "dmaapUserPassword": "dradmin",
- "dmaapContentType": "application/octet-stream"
- }
- },
- "ftp": {
- "ftpesConfiguration": {
- "keyCert": "config/dfc.jks",
- "keyPassword": "secret",
- "trustedCa": "config/ftp.jks",
- "trustedCaPassword": "secret"
- }
- }
- }
-}
-
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
index 5be75ab3..b1148a6a 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
@@ -16,25 +16,51 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Resources;
import com.google.gson.JsonElement;
+import com.google.gson.JsonIOException;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
+import com.google.gson.JsonSyntaxException;
+
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
import java.nio.charset.StandardCharsets;
-import java.util.Objects;
+import java.util.Map;
+import java.util.Properties;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
+import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.CloudConfigurationProvider;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.read.ListAppender;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
/**
* Tests the AppConfig.
@@ -44,167 +70,285 @@ import org.junit.jupiter.api.Test;
*/
class AppConfigTest {
- private static final String DATAFILE_ENDPOINTS = "datafile_endpoints.json";
- private static final boolean CORRECT_JSON = true;
- private static final boolean INCORRECT_JSON = false;
+ private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
+
+
+ private static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = //
+ new ImmutableDmaapConsumerConfiguration.Builder() //
+ .timeoutMs(-1) //
+ .dmaapHostName("message-router.onap.svc.cluster.local") //
+ .dmaapUserName("admin") //
+ .dmaapUserPassword("admin") //
+ .dmaapTopicName("events/unauthenticated.VES_NOTIFICATION_OUTPUT") //
+ .dmaapPortNumber(2222) //
+ .dmaapContentType("application/json") //
+ .messageLimit(-1) //
+ .dmaapProtocol("http") //
+ .consumerId("C12") //
+ .consumerGroup("OpenDcae-c12") //
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
+ .build();
+
+ private static final ConsumerConfiguration CORRECT_CONSUMER_CONFIG = ImmutableConsumerConfiguration.builder() //
+ .topicUrl(
+ "http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12")
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
+ .build();
+
+ private static final PublisherConfiguration CORRECT_PUBLISHER_CONFIG = //
+ ImmutablePublisherConfiguration.builder() //
+ .publishUrl("https://message-router.onap.svc.cluster.local:3907/publish/1") //
+ .logUrl("https://dmaap.example.com/feedlog/972").trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
+ .changeIdentifier("PM_MEAS_FILES") //
+ .userName("user") //
+ .passWord("password") //
+ .build();
+
+ private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION = //
+ new ImmutableFtpesConfig.Builder() //
+ .keyCert("/config/dfc.jks") //
+ .keyPassword("secret") //
+ .trustedCa("config/ftp.jks") //
+ .trustedCaPassword("secret") //
+ .build();
+
+ private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = //
+ new ImmutableDmaapPublisherConfiguration.Builder() //
+ .dmaapTopicName("/publish/1") //
+ .dmaapUserPassword("password") //
+ .dmaapPortNumber(3907) //
+ .dmaapProtocol("https") //
+ .dmaapContentType("application/octet-stream") //
+ .dmaapHostName("message-router.onap.svc.cluster.local") //
+ .dmaapUserName("user") //
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
+ .build();
+
+ private static EnvProperties properties() {
+ return ImmutableEnvProperties.builder() //
+ .consulHost("host") //
+ .consulPort(123) //
+ .cbsName("cbsName") //
+ .appName("appName") //
+ .build();
+ }
- private static AppConfig appConfigUnderTest;
+ private AppConfig appConfigUnderTest;
+ private CloudConfigurationProvider cloudConfigurationProvider = mock(CloudConfigurationProvider.class);
+ private final Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
- private static String filePath =
- Objects.requireNonNull(AppConfigTest.class.getClassLoader().getResource(DATAFILE_ENDPOINTS)).getFile();
@BeforeEach
public void setUp() {
appConfigUnderTest = spy(AppConfig.class);
+ appConfigUnderTest.setCloudConfigurationProvider(cloudConfigurationProvider);
+ appConfigUnderTest.systemEnvironment = new Properties();
}
@Test
- public void whenApplicationWasStarted_FilePathIsSet() {
+ public void whenTheConfigurationFits() throws IOException, DatafileTaskException {
// When
- appConfigUnderTest.setFilepath(filePath);
+ doReturn(getCorrectJson()).when(appConfigUnderTest).createInputStream(any());
+ appConfigUnderTest.initialize();
// Then
- verify(appConfigUnderTest, times(1)).setFilepath(anyString());
- verify(appConfigUnderTest, times(0)).loadConfigurationFromFile();
- Assertions.assertEquals(filePath, appConfigUnderTest.getFilepath());
+ verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
+
+ ConsumerConfiguration consumerCfg = appConfigUnderTest.getDmaapConsumerConfiguration();
+ Assertions.assertNotNull(consumerCfg);
+ assertThat(consumerCfg.toDmaap()).isEqualToComparingFieldByField(CORRECT_DMAAP_CONSUMER_CONFIG);
+ assertThat(consumerCfg).isEqualToComparingFieldByField(CORRECT_CONSUMER_CONFIG);
+
+ PublisherConfiguration publisherCfg = appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER);
+ Assertions.assertNotNull(publisherCfg);
+ assertThat(publisherCfg).isEqualToComparingFieldByField(CORRECT_PUBLISHER_CONFIG);
+ assertThat(publisherCfg.toDmaap()).isEqualToComparingFieldByField(CORRECT_DMAAP_PUBLISHER_CONFIG);
+
+ FtpesConfig ftpesConfig = appConfigUnderTest.getFtpesConfiguration();
+ assertThat(ftpesConfig).isNotNull();
+ assertThat(ftpesConfig).isEqualToComparingFieldByField(CORRECT_FTPES_CONFIGURATION);
}
@Test
- public void whenTheConfigurationFits_GetFtpsAndDmaapObjectRepresentationConfiguration() throws IOException {
- // Given
- InputStream inputStream =
- new ByteArrayInputStream((getJsonConfig(CORRECT_JSON).getBytes(StandardCharsets.UTF_8)));
-
+ public void whenTheConfigurationFits_twoProducers() throws IOException, DatafileTaskException {
// When
- appConfigUnderTest.setFilepath(filePath);
- doReturn(inputStream).when(appConfigUnderTest).createInputStream(any());
+ doReturn(getCorrectJsonTwoProducers()).when(appConfigUnderTest).createInputStream(any());
appConfigUnderTest.loadConfigurationFromFile();
// Then
- verify(appConfigUnderTest, times(1)).setFilepath(anyString());
verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration());
- Assertions.assertNotNull(appConfigUnderTest.getDmaapPublisherConfiguration());
- Assertions.assertEquals(appConfigUnderTest.getDmaapPublisherConfiguration(),
- appConfigUnderTest.getDmaapPublisherConfiguration());
- Assertions.assertEquals(appConfigUnderTest.getDmaapConsumerConfiguration(),
- appConfigUnderTest.getDmaapConsumerConfiguration());
- Assertions.assertEquals(appConfigUnderTest.getFtpesConfiguration(), appConfigUnderTest.getFtpesConfiguration());
+ Assertions.assertNotNull(appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER));
+ Assertions.assertNotNull(appConfigUnderTest.getPublisherConfiguration("XX_FILES"));
+ Assertions.assertNotNull(appConfigUnderTest.getPublisherConfiguration("YY_FILES"));
+
+ assertThat(appConfigUnderTest.getPublisherConfiguration("XX_FILES").publishUrl())
+ .isEqualTo("feed01::publish_url");
+ assertThat(appConfigUnderTest.getPublisherConfiguration("YY_FILES").publishUrl())
+ .isEqualTo("feed01::publish_url");
}
@Test
- public void whenFileIsNotExist_ThrowIoException() {
+ public void whenFileIsNotExist_ThrowException() throws DatafileTaskException {
// Given
- filePath = "/temp.json";
- appConfigUnderTest.setFilepath(filePath);
+ appConfigUnderTest.setFilepath("/temp.json");
// When
appConfigUnderTest.loadConfigurationFromFile();
// Then
- verify(appConfigUnderTest, times(1)).setFilepath(anyString());
- verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration());
- Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration());
- Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration());
+ assertThatThrownBy(() -> appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER))
+ .hasMessageContaining("No PublishingConfiguration loaded, changeIdentifier: PM_MEAS_FILES");
+ Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration());
}
@Test
- public void whenFileIsExistsButJsonIsIncorrect() throws IOException {
- // Given
- InputStream inputStream =
- new ByteArrayInputStream((getJsonConfig(INCORRECT_JSON).getBytes(StandardCharsets.UTF_8)));
+ public void whenFileIsExistsButJsonIsIncorrect() throws IOException, DatafileTaskException {
// When
- appConfigUnderTest.setFilepath(filePath);
- doReturn(inputStream).when(appConfigUnderTest).createInputStream(any());
+ doReturn(getIncorrectJson()).when(appConfigUnderTest).createInputStream(any());
appConfigUnderTest.loadConfigurationFromFile();
// Then
- verify(appConfigUnderTest, times(1)).setFilepath(anyString());
verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration());
- Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration());
+ assertThatThrownBy(() -> appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER))
+ .hasMessageContaining(CHANGE_IDENTIFIER);
Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration());
-
}
-
@Test
- public void whenTheConfigurationFits_ButRootElementIsNotAJsonObject() throws IOException {
- // Given
- InputStream inputStream =
- new ByteArrayInputStream((getJsonConfig(CORRECT_JSON).getBytes(StandardCharsets.UTF_8)));
+ public void whenTheConfigurationFits_ButRootElementIsNotAJsonObject() throws IOException, DatafileTaskException {
+
// When
- appConfigUnderTest.setFilepath(filePath);
- doReturn(inputStream).when(appConfigUnderTest).createInputStream(any());
+ doReturn(getCorrectJson()).when(appConfigUnderTest).createInputStream(any());
JsonElement jsonElement = mock(JsonElement.class);
when(jsonElement.isJsonObject()).thenReturn(false);
doReturn(jsonElement).when(appConfigUnderTest).getJsonElement(any(JsonParser.class), any(InputStream.class));
appConfigUnderTest.loadConfigurationFromFile();
// Then
- verify(appConfigUnderTest, times(1)).setFilepath(anyString());
verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration());
- Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration());
+ assertThatThrownBy(() -> appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER))
+ .hasMessageContaining(CHANGE_IDENTIFIER);
Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration());
}
- private String getJsonConfig(boolean correct) {
- JsonObject dmaapConsumerConfigData = new JsonObject();
- dmaapConsumerConfigData.addProperty("dmaapHostName", "localhost");
- dmaapConsumerConfigData.addProperty("dmaapPortNumber", 2222);
- dmaapConsumerConfigData.addProperty("dmaapTopicName", "/events/unauthenticated.VES_NOTIFICATION_OUTPUT");
- dmaapConsumerConfigData.addProperty("dmaapProtocol", "http");
- dmaapConsumerConfigData.addProperty("dmaapUserName", "admin");
- dmaapConsumerConfigData.addProperty("dmaapUserPassword", "admin");
- dmaapConsumerConfigData.addProperty("dmaapContentType", "application/json");
- dmaapConsumerConfigData.addProperty("consumerId", "C12");
- dmaapConsumerConfigData.addProperty("consumerGroup", "OpenDcae-c12");
- dmaapConsumerConfigData.addProperty("timeoutMs", -1);
- dmaapConsumerConfigData.addProperty("messageLimit", 1);
-
- JsonObject dmaapProducerConfigData = new JsonObject();
- dmaapProducerConfigData.addProperty("dmaapHostName", "localhost");
- dmaapProducerConfigData.addProperty("dmaapPortNumber", 3907);
- dmaapProducerConfigData.addProperty("dmaapTopicName", "publish");
- dmaapProducerConfigData.addProperty("dmaapProtocol", "https");
- if (correct) {
- dmaapProducerConfigData.addProperty("dmaapUserName", "dradmin");
- dmaapProducerConfigData.addProperty("dmaapUserPassword", "dradmin");
- dmaapProducerConfigData.addProperty("dmaapContentType", "application/octet-stream");
- }
-
- JsonObject dmaapConfigs = new JsonObject();
- dmaapConfigs.add("dmaapConsumerConfiguration", dmaapConsumerConfigData);
- dmaapConfigs.add("dmaapProducerConfiguration", dmaapProducerConfigData);
-
- JsonObject ftpesConfigData = new JsonObject();
- ftpesConfigData.addProperty("keyCert", "config/dfc.jks");
- ftpesConfigData.addProperty("keyPassword", "secret");
- ftpesConfigData.addProperty("trustedCa", "config/ftp.jks");
- ftpesConfigData.addProperty("trustedCaPassword", "secret");
-
- JsonObject security = new JsonObject();
- security.addProperty("trustStorePath", "trustStorePath");
- security.addProperty("trustStorePasswordPath", "trustStorePasswordPath");
- security.addProperty("keyStorePath", "keyStorePath");
- security.addProperty("keyStorePasswordPath", "keyStorePasswordPath");
- security.addProperty("enableDmaapCertAuth", "enableDmaapCertAuth");
-
- JsonObject ftpesConfiguration = new JsonObject();
- ftpesConfiguration.add("ftpesConfiguration", ftpesConfigData);
-
- JsonObject configs = new JsonObject();
- configs.add("dmaap", dmaapConfigs);
- configs.add("ftp", ftpesConfiguration);
- configs.add("security", security);
-
- JsonObject completeJson = new JsonObject();
- completeJson.add("configs", configs);
-
- return completeJson.toString();
+ @Test
+ public void whenPeriodicConfigRefreshNoEnvironmentVariables() {
+ ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(AppConfig.class);
+
+ Flux<AppConfig> task = appConfigUnderTest.createRefreshConfigurationTask(1L, context);
+
+ StepVerifier //
+ .create(task) //
+ .expectSubscription() //
+ .expectNextCount(0) //
+ .verifyComplete();
+
+ assertTrue(logAppender.list.toString().contains("$CONSUL_HOST environment has not been defined"));
+ }
+
+ @Test
+ public void whenPeriodicConfigRefreshNoConsul() {
+ ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(AppConfig.class);
+
+ doReturn(Mono.just(properties())).when(appConfigUnderTest).readEnvironmentVariables(any(), any());
+ Mono<JsonObject> err = Mono.error(new IOException());
+ doReturn(err).when(cloudConfigurationProvider).callForServiceConfigurationReactive(any());
+
+ Flux<AppConfig> task = appConfigUnderTest.createRefreshConfigurationTask(1L, context);
+
+ StepVerifier //
+ .create(task) //
+ .expectSubscription() //
+ .expectNextCount(0) //
+ .verifyComplete();
+
+ assertTrue(logAppender.list.toString()
+ .contains("Could not refresh application configuration java.io.IOException"));
+ }
+
+ @Test
+ public void whenPeriodicConfigRefreshSuccess() throws JsonIOException, JsonSyntaxException, IOException {
+ doReturn(Mono.just(properties())).when(appConfigUnderTest).readEnvironmentVariables(any(), any());
+
+ Mono<JsonObject> json = Mono.just(getJsonRootObject());
+
+ doReturn(json, json).when(cloudConfigurationProvider).callForServiceConfigurationReactive(any());
+
+ Flux<AppConfig> task = appConfigUnderTest.createRefreshConfigurationTask(1L, context);
+
+ StepVerifier //
+ .create(task) //
+ .expectSubscription() //
+ .expectNext(appConfigUnderTest) //
+ .verifyComplete();
+
+ Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration());
+ }
+
+ @Test
+ public void whenPeriodicConfigRefreshSuccess2() throws JsonIOException, JsonSyntaxException, IOException {
+ doReturn(Mono.just(properties())).when(appConfigUnderTest).readEnvironmentVariables(any(), any());
+
+ Mono<JsonObject> json = Mono.just(getJsonRootObject());
+ Mono<JsonObject> err = Mono.error(new IOException()); // no config entry created by the
+ // dmaap plugin
+
+ doReturn(json, err).when(cloudConfigurationProvider).callForServiceConfigurationReactive(any());
+
+ Flux<AppConfig> task = appConfigUnderTest.createRefreshConfigurationTask(1L, context);
+
+ StepVerifier //
+ .create(task) //
+ .expectSubscription() //
+ .expectNext(appConfigUnderTest) //
+ .verifyComplete();
+
+ Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration());
+ }
+
+ private JsonObject getJsonRootObject() throws JsonIOException, JsonSyntaxException, IOException {
+ JsonObject rootObject = (new JsonParser()).parse(new InputStreamReader(getCorrectJson())).getAsJsonObject();
+ return rootObject;
+ }
+
+ private static InputStream getCorrectJson() throws IOException {
+ URL url = CloudConfigParser.class.getClassLoader().getResource("datafile_endpoints_test.json");
+ String string = Resources.toString(url, Charsets.UTF_8);
+ return new ByteArrayInputStream((string.getBytes(StandardCharsets.UTF_8)));
+ }
+
+ private static InputStream getCorrectJsonTwoProducers() throws IOException {
+ URL url = CloudConfigParser.class.getClassLoader().getResource("datafile_endpoints_test_2producers.json");
+ String string = Resources.toString(url, Charsets.UTF_8);
+ return new ByteArrayInputStream((string.getBytes(StandardCharsets.UTF_8)));
+ }
+
+ private static InputStream getIncorrectJson() {
+ String string = "{" + //
+ " \"configs\": {" + //
+ " \"dmaap\": {"; //
+ return new ByteArrayInputStream((string.getBytes(StandardCharsets.UTF_8)));
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
deleted file mode 100644
index 07233d95..00000000
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*-
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.configuration;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import com.google.gson.JsonObject;
-import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
-
-
-class CloudConfigParserTest {
- private static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = //
- new ImmutableDmaapConsumerConfiguration.Builder() //
- .timeoutMs(-1) //
- .dmaapHostName("message-router.onap.svc.cluster.local") //
- .dmaapUserName("admin") //
- .dmaapUserPassword("admin") //
- .dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT") //
- .dmaapPortNumber(2222) //
- .dmaapContentType("application/json") //
- .messageLimit(-1) //
- .dmaapProtocol("http") //
- .consumerId("C12") //
- .consumerGroup("OpenDCAE-c12") //
- .trustStorePath("trustStorePath") //
- .trustStorePasswordPath("trustStorePasswordPath") //
- .keyStorePath("keyStorePath") //
- .keyStorePasswordPath("keyStorePasswordPath") //
- .enableDmaapCertAuth(true) //
- .build();
-
- private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = //
- new ImmutableDmaapPublisherConfiguration.Builder() //
- .dmaapTopicName("publish") //
- .dmaapUserPassword("dradmin") //
- .dmaapPortNumber(3907) //
- .dmaapProtocol("https") //
- .dmaapContentType("application/json") //
- .dmaapHostName("message-router.onap.svc.cluster.local") //
- .dmaapUserName("dradmin") //
- .trustStorePath("trustStorePath") //
- .trustStorePasswordPath("trustStorePasswordPath") //
- .keyStorePath("keyStorePath") //
- .keyStorePasswordPath("keyStorePasswordPath") //
- .enableDmaapCertAuth(true) //
- .build();
-
- private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION = //
- new ImmutableFtpesConfig.Builder() //
- .keyCert("/config/dfc.jks") //
- .keyPassword("secret") //
- .trustedCa("config/ftp.jks") //
- .trustedCaPassword("secret") //
- .build();
-
- private CloudConfigParser cloudConfigParser = new CloudConfigParser(getCloudConfigJsonObject());
-
- @Test
- public void shouldCreateDmaapConsumerConfigurationCorrectly() {
- DmaapConsumerConfiguration dmaapConsumerConfig = cloudConfigParser.getDmaapConsumerConfig();
-
- assertThat(dmaapConsumerConfig).isNotNull();
- assertThat(dmaapConsumerConfig).isEqualToComparingFieldByField(CORRECT_DMAAP_CONSUMER_CONFIG);
- }
-
- @Test
- public void shouldCreateDmaapPublisherConfigurationCorrectly() {
- DmaapPublisherConfiguration dmaapPublisherConfig = cloudConfigParser.getDmaapPublisherConfig();
-
- assertThat(dmaapPublisherConfig).isNotNull();
- assertThat(dmaapPublisherConfig).isEqualToComparingFieldByField(CORRECT_DMAAP_PUBLISHER_CONFIG);
- }
-
- @Test
- public void shouldCreateFtpesConfigurationCorrectly() {
- FtpesConfig ftpesConfig = cloudConfigParser.getFtpesConfig();
-
- assertThat(ftpesConfig).isNotNull();
- assertThat(ftpesConfig).isEqualToComparingFieldByField(CORRECT_FTPES_CONFIGURATION);
- }
-
- public JsonObject getCloudConfigJsonObject() {
- JsonObject config = new JsonObject();
- config.addProperty("dmaap.dmaapConsumerConfiguration.timeoutMs", -1);
- config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapHostName", "message-router.onap.svc.cluster.local");
- config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapUserName", "admin");
- config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapUserPassword", "admin");
- config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapTopicName",
- "/events/unauthenticated.VES_NOTIFICATION_OUTPUT");
- config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapPortNumber", 2222);
- config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapContentType", "application/json");
- config.addProperty("dmaap.dmaapConsumerConfiguration.messageLimit", -1);
- config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapProtocol", "http");
- config.addProperty("dmaap.dmaapConsumerConfiguration.consumerId", "C12");
- config.addProperty("dmaap.dmaapConsumerConfiguration.consumerGroup", "OpenDCAE-c12");
- config.addProperty("dmaap.dmaapProducerConfiguration.dmaapTopicName", "publish");
- config.addProperty("dmaap.dmaapProducerConfiguration.dmaapProtocol", "https");
- config.addProperty("dmaap.dmaapProducerConfiguration.dmaapContentType", "application/json");
- config.addProperty("dmaap.dmaapProducerConfiguration.dmaapHostName", "message-router.onap.svc.cluster.local");
- config.addProperty("dmaap.dmaapProducerConfiguration.dmaapPortNumber", 3907);
- config.addProperty("dmaap.dmaapProducerConfiguration.dmaapUserName", "dradmin");
- config.addProperty("dmaap.dmaapProducerConfiguration.dmaapUserPassword", "dradmin");
- config.addProperty("dmaap.ftpesConfig.keyCert", "/config/dfc.jks");
- config.addProperty("dmaap.ftpesConfig.keyPassword", "secret");
- config.addProperty("dmaap.ftpesConfig.trustedCa", "config/ftp.jks");
- config.addProperty("dmaap.ftpesConfig.trustedCaPassword", "secret");
-
- config.addProperty("dmaap.security.trustStorePath", "trustStorePath");
- config.addProperty("dmaap.security.trustStorePasswordPath", "trustStorePasswordPath");
- config.addProperty("dmaap.security.keyStorePath", "keyStorePath");
- config.addProperty("dmaap.security.keyStorePasswordPath", "keyStorePasswordPath");
- config.addProperty("dmaap.security.enableDmaapCertAuth", "true");
-
- return config;
- }
-}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfigTest.java
index 6e2140b4..eba88c33 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfigTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfigTest.java
@@ -25,6 +25,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
@@ -35,26 +36,40 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
+
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
+
import reactor.test.StepVerifier;
public class SchedulerConfigTest {
+ private final AppConfig appConfigurationMock = mock(AppConfig.class);
+ private final TaskScheduler taskSchedulerMock = mock(TaskScheduler.class);
+ private final ScheduledTasks scheduledTasksMock = mock(ScheduledTasks.class);
+ private final SchedulerConfig schedulerUnderTest =
+ spy(new SchedulerConfig(taskSchedulerMock, scheduledTasksMock, appConfigurationMock));
+
+ @BeforeEach
+ public void setUp() {
+ doNothing().when(appConfigurationMock).stop();
+ doNothing().when(appConfigurationMock).initialize();
+ }
+
@Test
public void getResponseFromCancellationOfTasks_success() {
+
List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
ScheduledFuture<?> scheduledFutureMock = mock(ScheduledFuture.class);
scheduledFutureList.add(scheduledFutureMock);
SchedulerConfig.setScheduledFutureList(scheduledFutureList);
- SchedulerConfig schedulerUnderTest = new SchedulerConfig(null, null, null);
-
String msg = "Datafile Service has already been stopped!";
StepVerifier.create(schedulerUnderTest.getResponseFromCancellationOfTasks())
.expectNext(new ResponseEntity<String>(msg, HttpStatus.CREATED)) //
@@ -68,24 +83,17 @@ public class SchedulerConfigTest {
@Test
public void tryToStartTaskWhenNotStarted_success() {
- TaskScheduler taskSchedulerMock = mock(TaskScheduler.class);
- ScheduledTasks scheduledTasksMock = mock(ScheduledTasks.class);
- CloudConfiguration cloudConfigurationMock = mock(CloudConfiguration.class);
List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
SchedulerConfig.setScheduledFutureList(scheduledFutureList);
SchedulerConfig schedulerUnderTestSpy =
- spy(new SchedulerConfig(taskSchedulerMock, scheduledTasksMock, cloudConfigurationMock));
+ spy(new SchedulerConfig(taskSchedulerMock, scheduledTasksMock, appConfigurationMock));
boolean actualResult = schedulerUnderTestSpy.tryToStartTask();
assertTrue(actualResult);
- ArgumentCaptor<Runnable> runTaskRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
- verify(taskSchedulerMock).scheduleAtFixedRate(runTaskRunnableCaptor.capture(), any(Instant.class),
- eq(Duration.ofMinutes(5)));
-
ArgumentCaptor<Runnable> scheduleMainDatafileEventTaskCaptor = ArgumentCaptor.forClass(Runnable.class);
verify(taskSchedulerMock).scheduleWithFixedDelay(scheduleMainDatafileEventTaskCaptor.capture(),
eq(Duration.ofSeconds(15)));
@@ -100,22 +108,22 @@ public class SchedulerConfigTest {
verify(scheduledTasksMock).executeDatafileMainTask();
verifyNoMoreInteractions(scheduledTasksMock);
- runTaskRunnableCaptor.getValue().run();
- verify(cloudConfigurationMock).runTask();
- verifyNoMoreInteractions(cloudConfigurationMock);
+ verify(appConfigurationMock).initialize();
+ verifyNoMoreInteractions(appConfigurationMock);
- assertEquals(3, scheduledFutureList.size());
+ assertEquals(2, scheduledFutureList.size());
}
@Test
public void tryToStartTaskWhenAlreadyStarted_shouldReturnFalse() {
+ doNothing().when(appConfigurationMock).loadConfigurationFromFile();
List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
ScheduledFuture<?> scheduledFutureMock = mock(ScheduledFuture.class);
scheduledFutureList.add(scheduledFutureMock);
SchedulerConfig.setScheduledFutureList(scheduledFutureList);
- SchedulerConfig schedulerUnderTest = new SchedulerConfig(null, null, null);
+ SchedulerConfig schedulerUnderTest = new SchedulerConfig(null, null, appConfigurationMock);
boolean actualResult = schedulerUnderTest.tryToStartTask();
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java
deleted file mode 100644
index 7f6b8c51..00000000
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*-
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.integration;
-
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.verify;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.test.context.ContextConfiguration;
-import org.springframework.test.context.junit.jupiter.SpringExtension;
-import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
-
-/**
- * Integration test for the ScheduledXmlContext.
- *
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/27/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-
-@Configuration
-@ComponentScan
-@ExtendWith({ SpringExtension.class })
-@ContextConfiguration(locations = { "classpath:scheduled-context.xml" })
-class ScheduledXmlContextITest extends AbstractTestNGSpringContextTests {
-
- private static final int WAIT_FOR_SCHEDULING = 1;
-
- @Autowired
- private ScheduledTasks scheduledTask;
-
- @Test
- void testScheduling() {
- final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
- executorService.scheduleWithFixedDelay(this::verifyDmaapConsumerTask, 0, WAIT_FOR_SCHEDULING, TimeUnit.SECONDS);
- }
-
- private void verifyDmaapConsumerTask() {
- verify(scheduledTask, atLeast(1)).executeDatafileMainTask();
- }
-}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java
deleted file mode 100644
index 83c92ef4..00000000
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*-
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.model;
-
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.HashMap;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-public class FilePublishInformationTest {
- private static final String PRODUCT_NAME = "NrRadio";
- private static final String VENDOR_NAME = "Ericsson";
- private static final String LAST_EPOCH_MICROSEC = "8745745764578";
- private static final String SOURCE_NAME = "oteNB5309";
- private static final String START_EPOCH_MICROSEC = "8745745764578";
- private static final String TIME_ZONE_OFFSET = "UTC+05:00";
- private static final String NAME = "A20161224.1030-1045.bin.gz";
- private static final String LOCATION = "ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1145.bin.gz";
- private static final Path INTERNAL_LOCATION = Paths.get("target/A20161224.1030-1045.bin.gz");
- private static final String COMPRESSION = "gzip";
- private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
- private static final String FILE_FORMAT_VERSION = "V10";
-
- @Test
- public void filePublishInformationBuilder_shouldBuildAnObject() {
- FilePublishInformation filePublishInformation = ImmutableFilePublishInformation.builder() //
- .productName(PRODUCT_NAME) //
- .vendorName(VENDOR_NAME) //
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
- .sourceName(SOURCE_NAME) //
- .startEpochMicrosec(START_EPOCH_MICROSEC) //
- .timeZoneOffset(TIME_ZONE_OFFSET) //
- .name(NAME) //
- .location(LOCATION) //
- .internalLocation(INTERNAL_LOCATION) //
- .compression(COMPRESSION) //
- .fileFormatType(FILE_FORMAT_TYPE) //
- .fileFormatVersion(FILE_FORMAT_VERSION) //
- .context(new HashMap<String,String>()) //
- .build();
-
- Assertions.assertNotNull(filePublishInformation);
- Assertions.assertEquals(PRODUCT_NAME, filePublishInformation.getProductName());
- Assertions.assertEquals(VENDOR_NAME, filePublishInformation.getVendorName());
- Assertions.assertEquals(LAST_EPOCH_MICROSEC, filePublishInformation.getLastEpochMicrosec());
- Assertions.assertEquals(SOURCE_NAME, filePublishInformation.getSourceName());
- Assertions.assertEquals(START_EPOCH_MICROSEC, filePublishInformation.getStartEpochMicrosec());
- Assertions.assertEquals(TIME_ZONE_OFFSET, filePublishInformation.getTimeZoneOffset());
- Assertions.assertEquals(NAME, filePublishInformation.getName());
- Assertions.assertEquals(LOCATION, filePublishInformation.getLocation());
- Assertions.assertEquals(INTERNAL_LOCATION, filePublishInformation.getInternalLocation());
- Assertions.assertEquals(COMPRESSION, filePublishInformation.getCompression());
- Assertions.assertEquals(FILE_FORMAT_TYPE, filePublishInformation.getFileFormatType());
- Assertions.assertEquals(FILE_FORMAT_VERSION, filePublishInformation.getFileFormatVersion());
- }
-}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
index becfba31..8c7938bf 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
@@ -22,9 +22,12 @@ import static org.mockito.Mockito.spy;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
+
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
@@ -36,6 +39,7 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
+
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@@ -65,7 +69,7 @@ class JsonMessageParserTest {
private static final String NOTIFICATION_FIELDS_VERSION = "1.0";
@Test
- void whenPassingCorrectJson_oneFileReadyMessage() {
+ void whenPassingCorrectJson_oneFileReadyMessage() throws URISyntaxException {
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
.name(PM_FILE_NAME) //
.location(LOCATION) //
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java
index e21bbd7b..a71521cd 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java
@@ -26,7 +26,7 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import java.net.URI;
+
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
@@ -35,7 +35,9 @@ import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
+
import javax.net.ssl.SSLContext;
+
import org.apache.commons.codec.binary.Base64;
import org.apache.http.Header;
import org.apache.http.HttpResponse;
@@ -192,10 +194,4 @@ class DmaapProducerHttpClientTest {
Header[] authorizationHeaders = request.getHeaders("Authorization");
assertEquals(base64Creds, authorizationHeaders[0].getValue());
}
-
- @Test
- public void getBaseUri_success() {
- URI uri = producerClientUnderTestSpy.getBaseUri().build();
- assertEquals(HTTPS_SCHEME + "://" + HOST + ":" + PORT, uri.toString());
- }
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
index 5e737253..574ad18e 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
@@ -77,6 +77,7 @@ public class DMaaPMessageConsumerTest {
private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
private static final String FILE_FORMAT_VERSION = "V10";
private static List<FilePublishInformation> listOfFilePublishInformation = new ArrayList<FilePublishInformation>();
+ private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
private DMaaPConsumerReactiveHttpClient httpClientMock;
@@ -173,6 +174,7 @@ public class DMaaPMessageConsumerTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
.context(new HashMap<String,String>()) //
.build();
listOfFilePublishInformation.add(filePublishInformation);
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
index 8f768d38..463c62c9 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
@@ -47,14 +47,12 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.springframework.http.HttpStatus;
-import org.springframework.web.util.DefaultUriBuilderFactory;
-import org.springframework.web.util.UriBuilder;
import reactor.test.StepVerifier;
@@ -65,6 +63,7 @@ import reactor.test.StepVerifier;
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
class DataRouterPublisherTest {
+
private static final String PRODUCT_NAME = "NrRadio";
private static final String VENDOR_NAME = "Ericsson";
private static final String LAST_EPOCH_MICROSEC = "8745745764578";
@@ -73,6 +72,7 @@ class DataRouterPublisherTest {
private static final String TIME_ZONE_OFFSET = "UTC+05:00";
private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
private static final String FTPES_ADDRESS = "ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME;
+ private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
private static final String COMPRESSION = "gzip";
private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
@@ -90,15 +90,17 @@ class DataRouterPublisherTest {
private static FilePublishInformation filePublishInformation;
private static DmaapProducerHttpClient httpClientMock;
private static AppConfig appConfig;
- private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
+ private static PublisherConfiguration publisherConfigurationMock = mock(PublisherConfiguration.class);
private static Map<String, String> context = new HashMap<>();
private static DataRouterPublisher publisherTaskUnderTestSpy;
+ // "https://54.45.333.2:1234/publish/1";
+ private static final String PUBLISH_URL =
+ HTTPS_SCHEME + "://" + HOST + ":" + PORT + "/" + PUBLISH_TOPIC + "/" + FEED_ID;
+
@BeforeAll
public static void setUp() {
- when(publisherConfigurationMock.dmaapHostName()).thenReturn(HOST);
- when(publisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME);
- when(publisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT);
+ when(publisherConfigurationMock.publishUrl()).thenReturn(PUBLISH_URL);
filePublishInformation = ImmutableFilePublishInformation.builder() //
.productName(PRODUCT_NAME) //
@@ -114,6 +116,7 @@ class DataRouterPublisherTest {
.fileFormatType(FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
.context(context) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
.build(); //
appConfig = mock(AppConfig.class);
publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig));
@@ -128,7 +131,6 @@ class DataRouterPublisherTest {
.verifyComplete();
ArgumentCaptor<HttpUriRequest> requestCaptor = ArgumentCaptor.forClass(HttpUriRequest.class);
- verify(httpClientMock).getBaseUri();
verify(httpClientMock).addUserCredentialsToHead(any(HttpUriRequest.class));
verify(httpClientMock).getDmaapProducerResponseWithRedirect(requestCaptor.capture(), any());
verifyNoMoreInteractions(httpClientMock);
@@ -138,6 +140,7 @@ class DataRouterPublisherTest {
assertEquals(HTTPS_SCHEME, actualUri.getScheme());
assertEquals(HOST, actualUri.getHost());
assertEquals(PORT, actualUri.getPort());
+
Path actualPath = Paths.get(actualUri.getPath());
assertTrue(PUBLISH_TOPIC.equals(actualPath.getName(0).toString()));
assertTrue(FEED_ID.equals(actualPath.getName(1).toString()));
@@ -160,7 +163,8 @@ class DataRouterPublisherTest {
assertEquals(FILE_FORMAT_TYPE, metaHash.get("fileFormatType"));
assertEquals(FILE_FORMAT_VERSION, metaHash.get("fileFormatVersion"));
- // Note that the following line checks the number of properties that are sent to the data router.
+ // Note that the following line checks the number of properties that are sent to the data
+ // router.
// This should be 10 unless the API is updated (which is the fields checked above)
assertEquals(10, metaHash.size());
}
@@ -185,7 +189,6 @@ class DataRouterPublisherTest {
.expectNext(filePublishInformation) //
.verifyComplete();
- verify(httpClientMock, times(2)).getBaseUri();
verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class));
verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any());
verifyNoMoreInteractions(httpClientMock);
@@ -201,7 +204,6 @@ class DataRouterPublisherTest {
.expectErrorMessage("Retries exhausted: 1/1") //
.verify();
- verify(httpClientMock, times(2)).getBaseUri();
verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class));
verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any());
verifyNoMoreInteractions(httpClientMock);
@@ -211,12 +213,9 @@ class DataRouterPublisherTest {
final void prepareMocksForTests(Exception exception, Integer firstResponse, Integer... nextHttpResponses)
throws Exception {
httpClientMock = mock(DmaapProducerHttpClient.class);
- when(appConfig.getDmaapPublisherConfiguration()).thenReturn(publisherConfigurationMock);
- doReturn(publisherConfigurationMock).when(publisherTaskUnderTestSpy).resolveConfiguration();
- doReturn(httpClientMock).when(publisherTaskUnderTestSpy).resolveClient();
-
- UriBuilder uriBuilder = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT);
- when(httpClientMock.getBaseUri()).thenReturn(uriBuilder);
+ when(appConfig.getPublisherConfiguration(CHANGE_IDENTIFIER)).thenReturn(publisherConfigurationMock);
+ doReturn(publisherConfigurationMock).when(publisherTaskUnderTestSpy).resolveConfiguration(CHANGE_IDENTIFIER);
+ doReturn(httpClientMock).when(publisherTaskUnderTestSpy).resolveClient(CHANGE_IDENTIFIER);
HttpResponse httpResponseMock = mock(HttpResponse.class);
if (exception == null) {
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
index cad3486d..299a0238 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
@@ -83,6 +83,7 @@ public class FileCollectorTest {
private static final String FTP_KEY_PASSWORD = "ftpKeyPassword";
private static final String TRUSTED_CA_PATH = "trustedCAPath";
private static final String TRUSTED_CA_PASSWORD = "trustedCAPassword";
+ private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
private static AppConfig appConfigMock = mock(AppConfig.class);
private static FtpesConfig ftpesConfigMock = mock(FtpesConfig.class);
@@ -132,7 +133,8 @@ public class FileCollectorTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
- .context(new HashMap<String,String>())
+ .context(new HashMap<String,String>()) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
.build();
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java
index 83643637..44755814 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java
@@ -34,10 +34,9 @@ import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.net.URI;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
+
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.StatusLine;
@@ -47,55 +46,47 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.springframework.web.util.DefaultUriBuilderFactory;
-import org.springframework.web.util.UriBuilder;
public class PublishedCheckerTest {
+ private static final String PUBLISH_URL = "https://54.45.33.2:1234/";
private static final String EMPTY_CONTENT = "[]";
- private static final String FEEDLOG_TOPIC = "feedlog";
- private static final String FEED_ID = "1";
- private static final String HTTPS_SCHEME = "https";
- private static final String HOST = "54.45.33.2";
- private static final int PORT = 1234;
private static final String SOURCE_NAME = "oteNB5309";
private static final String FILE_NAME = "A20161224.1030-1045.bin.gz";
private static final String LOCAL_FILE_NAME = SOURCE_NAME + "_" + FILE_NAME;
+ private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
+ private static final String LOG_URI = "https://localhost:3907/feedlog/1";
private static final Map<String, String> CONTEXT_MAP = new HashMap<>();
- private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
+ private static PublisherConfiguration publisherConfigurationMock = mock(PublisherConfiguration.class);
private static AppConfig appConfigMock;
private DmaapProducerHttpClient httpClientMock = mock(DmaapProducerHttpClient.class);
private PublishedChecker publishedCheckerUnderTestSpy;
- /**
- * Sets up data for the tests.
- */
+
@BeforeAll
- public static void setUp() {
- when(publisherConfigurationMock.dmaapHostName()).thenReturn(HOST);
- when(publisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME);
- when(publisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT);
+ public static void setUp() throws DatafileTaskException {
+ when(publisherConfigurationMock.publishUrl()).thenReturn(PUBLISH_URL);
appConfigMock = mock(AppConfig.class);
- when(appConfigMock.getDmaapPublisherConfiguration()).thenReturn(publisherConfigurationMock);
+ when(appConfigMock.getPublisherConfiguration(CHANGE_IDENTIFIER)).thenReturn(publisherConfigurationMock);
}
@Test
public void executeWhenNotPublished_returnsFalse() throws Exception {
prepareMocksForTests(HttpUtils.SC_OK, EMPTY_CONTENT, null);
- boolean isPublished = publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CONTEXT_MAP);
+ boolean isPublished =
+ publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CHANGE_IDENTIFIER, CONTEXT_MAP);
assertFalse(isPublished);
ArgumentCaptor<HttpUriRequest> requestCaptor = ArgumentCaptor.forClass(HttpUriRequest.class);
- verify(httpClientMock).getBaseUri();
verify(httpClientMock).addUserCredentialsToHead(any(HttpUriRequest.class));
verify(httpClientMock).getDmaapProducerResponseWithCustomTimeout(requestCaptor.capture(), any(), any());
verifyNoMoreInteractions(httpClientMock);
@@ -103,22 +94,17 @@ public class PublishedCheckerTest {
HttpUriRequest getRequest = requestCaptor.getValue();
assertTrue(getRequest instanceof HttpGet);
URI actualUri = getRequest.getURI();
- assertEquals(HTTPS_SCHEME, actualUri.getScheme());
- assertEquals(HOST, actualUri.getHost());
- assertEquals(PORT, actualUri.getPort());
- Path actualPath = Paths.get(actualUri.getPath());
- assertTrue(FEEDLOG_TOPIC.equals(actualPath.getName(0).toString()));
- assertTrue(FEED_ID.equals(actualPath.getName(1).toString()));
- String actualQuery = actualUri.getQuery();
- assertTrue(actualQuery.contains("type=pub"));
- assertTrue(actualQuery.contains("filename=" + LOCAL_FILE_NAME));
+ // https://localhost:3907/feedlog/1?type=pub&filename=oteNB5309_A20161224.1030-1045.bin.gz
+ String expUri = LOG_URI + "?type=pub&filename=" + LOCAL_FILE_NAME;
+ assertEquals(expUri, actualUri.toString());
}
@Test
public void executeWhenDataRouterReturnsNok_returnsFalse() throws Exception {
prepareMocksForTests(HttpUtils.SC_BAD_REQUEST, EMPTY_CONTENT, null);
- boolean isPublished = publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CONTEXT_MAP);
+ boolean isPublished =
+ publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CHANGE_IDENTIFIER, CONTEXT_MAP);
assertFalse(isPublished);
}
@@ -127,7 +113,8 @@ public class PublishedCheckerTest {
public void executeWhenPublished_returnsTrue() throws Exception {
prepareMocksForTests(HttpUtils.SC_OK, "[" + LOCAL_FILE_NAME + "]", null);
- boolean isPublished = publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CONTEXT_MAP);
+ boolean isPublished =
+ publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CHANGE_IDENTIFIER, CONTEXT_MAP);
assertTrue(isPublished);
}
@@ -136,7 +123,8 @@ public class PublishedCheckerTest {
public void executeWhenErrorInDataRouter_returnsFalse() throws Exception {
prepareMocksForTests(HttpUtils.SC_OK, EMPTY_CONTENT, new DatafileTaskException(""));
- boolean isPublished = publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CONTEXT_MAP);
+ boolean isPublished =
+ publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CHANGE_IDENTIFIER, CONTEXT_MAP);
assertFalse(isPublished);
}
@@ -144,11 +132,9 @@ public class PublishedCheckerTest {
final void prepareMocksForTests(int responseCode, String content, Exception exception) throws Exception {
publishedCheckerUnderTestSpy = spy(new PublishedChecker(appConfigMock));
- doReturn(publisherConfigurationMock).when(publishedCheckerUnderTestSpy).resolveConfiguration();
- doReturn(httpClientMock).when(publishedCheckerUnderTestSpy).resolveClient();
-
- UriBuilder uriBuilder = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT);
- when(httpClientMock.getBaseUri()).thenReturn(uriBuilder);
+ doReturn(publisherConfigurationMock).when(publishedCheckerUnderTestSpy).resolveConfiguration(CHANGE_IDENTIFIER);
+ doReturn(LOG_URI).when(publisherConfigurationMock).logUrl();
+ doReturn(httpClientMock).when(publishedCheckerUnderTestSpy).resolveClient(publisherConfigurationMock);
HttpResponse httpResponseMock = mock(HttpResponse.class);
if (exception == null) {
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
index 0d5a4231..a1021868 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
@@ -42,6 +42,11 @@ import java.util.Map;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration;
+import org.onap.dcaegen2.collectors.datafile.configuration.ImmutableConsumerConfiguration;
+import org.onap.dcaegen2.collectors.datafile.configuration.ImmutablePublisherConfiguration;
+import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
@@ -51,8 +56,6 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformati
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -61,6 +64,7 @@ import reactor.test.StepVerifier;
public class ScheduledTasksTest {
private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
+ private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
private AppConfig appConfig = mock(AppConfig.class);
private ScheduledTasks testedObject = spy(new ScheduledTasks(appConfig));
@@ -72,23 +76,33 @@ public class ScheduledTasksTest {
private DataRouterPublisher dataRouterMock;
private Map<String, String> contextMap = new HashMap<String, String>();
+ private final String publishUrl = "https://54.45.33.2:1234/unauthenticated.VES_NOTIFICATION_OUTPUT";
+
@BeforeEach
- private void setUp() {
- DmaapPublisherConfiguration dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() //
- .dmaapContentType("application/json") //
- .dmaapHostName("54.45.33.2") //
- .dmaapPortNumber(1234) //
- .dmaapProtocol("https") //
- .dmaapUserName("DFC") //
- .dmaapUserPassword("DFC") //
- .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") //
+ private void setUp() throws DatafileTaskException {
+ final PublisherConfiguration dmaapPublisherConfiguration = ImmutablePublisherConfiguration.builder() //
+ .publishUrl(publishUrl) //
+ .logUrl("") //
+ .userName("userName") //
+ .passWord("passWord") //
.trustStorePath("trustStorePath") //
.trustStorePasswordPath("trustStorePasswordPath") //
.keyStorePath("keyStorePath") //
.keyStorePasswordPath("keyStorePasswordPath") //
.enableDmaapCertAuth(true) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
.build(); //
- doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
+ final ConsumerConfiguration dmaapConsumerConfiguration = ImmutableConsumerConfiguration.builder() //
+ .topicUrl("topicUrl").trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
+ .build();
+
+ doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER);
+ doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration();
+ doReturn(true).when(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
consumerMock = mock(DMaaPMessageConsumer.class);
publishedCheckerMock = mock(PublishedChecker.class);
@@ -109,7 +123,7 @@ public class ScheduledTasksTest {
.sourceName("") //
.startEpochMicrosec("") //
.timeZoneOffset("") //
- .changeIdentifier("") //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
.changeType("") //
.build();
}
@@ -164,11 +178,12 @@ public class ScheduledTasksTest {
.compression("") //
.fileFormatType("") //
.fileFormatVersion("") //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
.context(new HashMap<String, String>()).build();
}
@Test
- public void notingToConsume() {
+ public void notingToConsume() throws DatafileTaskException {
doReturn(consumerMock).when(testedObject).createConsumerTask();
doReturn(Flux.empty()).when(consumerMock).getMessageRouterResponse();
@@ -180,7 +195,7 @@ public class ScheduledTasksTest {
}
@Test
- public void consume_successfulCase() {
+ public void consume_successfulCase() throws DatafileTaskException {
final int noOfEvents = 200;
final int noOfFilesPerEvent = 200;
final int noOfFiles = noOfEvents * noOfFilesPerEvent;
@@ -188,7 +203,7 @@ public class ScheduledTasksTest {
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
- doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
+ doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
@@ -212,11 +227,11 @@ public class ScheduledTasksTest {
}
@Test
- public void consume_fetchFailedOnce() {
+ public void consume_fetchFailedOnce() throws DatafileTaskException {
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
- doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
+ doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
Mono<Object> error = Mono.error(new Exception("problem"));
@@ -246,12 +261,12 @@ public class ScheduledTasksTest {
}
@Test
- public void consume_publishFailedOnce() {
+ public void consume_publishFailedOnce() throws DatafileTaskException {
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
- doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
+ doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
@@ -279,7 +294,7 @@ public class ScheduledTasksTest {
}
@Test
- public void consume_successfulCase_sameFileNames() {
+ public void consume_successfulCase_sameFileNames() throws DatafileTaskException {
final int noOfEvents = 1;
final int noOfFilesPerEvent = 100;
@@ -287,7 +302,7 @@ public class ScheduledTasksTest {
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
- doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
+ doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
@@ -303,7 +318,7 @@ public class ScheduledTasksTest {
verify(consumerMock, times(1)).getMessageRouterResponse();
verify(fileCollectorMock, times(1)).collectFile(notNull(), anyLong(), notNull(), notNull());
verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull());
- verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), notNull());
+ verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), anyString(), notNull());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
verifyNoMoreInteractions(consumerMock);
diff --git a/datafile-app-server/src/test/resources/datafile_endpoints.json b/datafile-app-server/src/test/resources/datafile_endpoints.json
deleted file mode 100644
index 14dee368..00000000
--- a/datafile-app-server/src/test/resources/datafile_endpoints.json
+++ /dev/null
@@ -1,44 +0,0 @@
-{
- "configs": {
- "dmaap": {
- "dmaapConsumerConfiguration": {
- "consumerId": "C12",
- "dmaapHostName": "localhost",
- "dmaapPortNumber": 2222,
- "dmaapTopicName": "/events/unauthenticated.VES_NOTIFICATION_OUTPUT",
- "dmaapProtocol": "http",
- "dmaapUserName": "admin",
- "dmaapUserPassword": "admin",
- "dmaapContentType": "application/json",
- "consumerGroup": "OpenDcae-c12",
- "timeoutMs": -1,
- "messageLimit": 1
- },
- "dmaapProducerConfiguration": {
- "dmaapHostName": "localhost",
- "dmaapPortNumber": 3907,
- "dmaapProtocol": "https",
- "dmaapTopicName": "publish",
- "dmaapUserName": "dradmin",
- "dmaapUserPassword": "dradmin",
- "dmaapContentType": "application/octet-stream"
- }
- },
- "ftp": {
- "ftpesConfiguration": {
- "keyCert": "/config/dfc.jks",
- "keyPassword": "secret",
- "trustedCa": "/config/ftp.jks",
- "trustedCaPassword": "secret"
- }
- },
- "security": {
- "trustStorePath" : "trustStorePath",
- "trustStorePasswordPath" : "trustStorePasswordPath",
- "keyStorePath" : "keyStorePath",
- "keyStorePasswordPath" : "keyStorePasswordPath",
- "enableDmaapCertAuth" : "enableDmaapCertAuth"
- }
- }
-}
-
diff --git a/datafile-app-server/src/test/resources/datafile_endpoints_test.json b/datafile-app-server/src/test/resources/datafile_endpoints_test.json
new file mode 100644
index 00000000..4d4d00ab
--- /dev/null
+++ b/datafile-app-server/src/test/resources/datafile_endpoints_test.json
@@ -0,0 +1,31 @@
+{
+ "dmaap.ftpesConfig.keyCert":"/config/dfc.jks",
+ "dmaap.ftpesConfig.keyPassword":"secret",
+ "dmaap.ftpesConfig.trustedCa":"config/ftp.jks",
+ "dmaap.ftpesConfig.trustedCaPassword":"secret",
+ "dmaap.security.trustStorePath":"trustStorePath",
+ "dmaap.security.trustStorePasswordPath":"trustStorePasswordPath",
+ "dmaap.security.keyStorePath":"keyStorePath",
+ "dmaap.security.keyStorePasswordPath":"keyStorePasswordPath",
+ "dmaap.security.enableDmaapCertAuth":"true",
+ "dmaap.dmaapProducerConfiguration": {
+ "changeIdentifier":"PM_MEAS_FILES",
+ "feedName":"feed00"
+ },
+ "streams_subscribes":{
+ "dmaap_subscriber":{
+ "dmmap_info":{
+ "topic_url":"http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12"
+ },
+ "type":"message_router"
+ }
+ },
+ "feed00":{
+ "username":"user",
+ "log_url":"https://dmaap.example.com/feedlog/972",
+ "publish_url":"https://message-router.onap.svc.cluster.local:3907/publish/1",
+ "location":"loc00",
+ "password":"password",
+ "publisher_id":"972.360gm"
+ }
+}
diff --git a/datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json b/datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json
new file mode 100644
index 00000000..a7e2497d
--- /dev/null
+++ b/datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json
@@ -0,0 +1,49 @@
+{
+ "dmaap.ftpesConfig.keyCert":"/config/dfc.jks",
+ "dmaap.ftpesConfig.keyPassword":"secret",
+ "dmaap.ftpesConfig.trustedCa":"config/ftp.jks",
+ "dmaap.ftpesConfig.trustedCaPassword":"secret",
+ "dmaap.security.trustStorePath":"trustStorePath",
+ "dmaap.security.trustStorePasswordPath":"trustStorePasswordPath",
+ "dmaap.security.keyStorePath":"keyStorePath",
+ "dmaap.security.keyStorePasswordPath":"keyStorePasswordPath",
+ "dmaap.security.enableDmaapCertAuth":"true",
+ "dmaap.dmaapProducerConfiguration":[
+ {
+ "changeIdentifier":"PM_MEAS_FILES",
+ "feedName":"feed00"
+ },
+ {
+ "changeIdentifier":"XX_FILES",
+ "feedName":"feed01"
+ },
+ {
+ "changeIdentifier":"YY_FILES",
+ "feedName":"feed01"
+ }
+ ],
+ "streams_subscribes":{
+ "dmaap_subscriber":{
+ "dmmap_info":{
+ "topic_url":"http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12"
+ },
+ "type":"message_router"
+ }
+ },
+ "feed00":{
+ "username":"user",
+ "log_url":"https://dmaap.example.com/feedlog/972",
+ "publish_url":"https://message-router.onap.svc.cluster.local:3907/publish/1",
+ "location":"loc00",
+ "password":"password",
+ "publisher_id":"972.360gm"
+ },
+ "feed01":{
+ "username":"user",
+ "log_url":"feed01::log_url",
+ "publish_url":"feed01::publish_url",
+ "location":"loc00",
+ "password":"",
+ "publisher_id":""
+ }
+}