From 4229afc64d82cbd1ea1e43c92fcd6c9bed9e5137 Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Wed, 29 May 2019 09:39:53 +0000 Subject: Generalizing Data File Collection to handle any type of file Extension of the DFC to be able to handle any file types types, which are published on different DR feeds. This association between file type and DR feed is defined by configuration. The file type is defined by the changeIdentifier in the fileReady VES message reported from the PNF. The creation of DR feeds and configuration will be done by the DMAAP plugin, but that is not tested yet. Change-Id: I13b36acd926a6941ee733e6b37922049fb54a5d9 Issue-ID: DCAEGEN2-1532 Signed-off-by: PatrikBuhr --- datafile-app-server/config/datafile_endpoints.json | 71 ++-- datafile-app-server/pom.xml | 13 + .../datafile/configuration/AppConfig.java | 211 ++++++++---- .../datafile/configuration/CloudConfigParser.java | 141 +++++--- .../datafile/configuration/CloudConfiguration.java | 110 ------- .../configuration/ConsumerConfiguration.java | 105 ++++++ .../configuration/EnvironmentProcessor.java | 4 +- .../datafile/configuration/FtpesConfig.java | 5 +- .../configuration/PublisherConfiguration.java | 74 +++++ .../datafile/configuration/SchedulerConfig.java | 24 +- .../datafile/configuration/SwaggerConfig.java | 2 +- .../datafile/configuration/TomcatHttpConfig.java | 2 +- .../datafile/controllers/ScheduleController.java | 4 +- .../collectors/datafile/ftp/FileServerData.java | 3 + .../collectors/datafile/ftp/FtpsClient.java | 4 +- .../collectors/datafile/ftp/SftpClient.java | 8 +- .../collectors/datafile/model/FileData.java | 4 +- .../datafile/model/FilePublishInformation.java | 4 + .../collectors/datafile/model/JsonSerializer.java | 15 +- .../datafile/service/JsonMessageParser.java | 46 +-- .../datafile/service/PublishedFileCache.java | 2 +- .../service/producer/DmaapProducerHttpClient.java | 20 +- .../datafile/tasks/DMaaPMessageConsumer.java | 9 +- .../datafile/tasks/DataRouterPublisher.java | 47 +-- .../collectors/datafile/tasks/FileCollector.java | 13 +- .../datafile/tasks/PublishedChecker.java | 52 +-- .../collectors/datafile/tasks/ScheduledTasks.java | 58 +++- .../src/main/resources/datafile_endpoints.json | 37 --- .../datafile/configuration/AppConfigTest.java | 356 +++++++++++++++------ .../configuration/CloudConfigParserTest.java | 133 -------- .../configuration/SchedulerConfigTest.java | 38 ++- .../integration/ScheduledXmlContextITest.java | 63 ---- .../datafile/model/FilePublishInformationTest.java | 72 ----- .../datafile/service/JsonMessageParserTest.java | 6 +- .../producer/DmaapProducerHttpClientTest.java | 10 +- .../datafile/tasks/DMaaPMessageConsumerTest.java | 2 + .../datafile/tasks/DataRouterPublisherTest.java | 33 +- .../datafile/tasks/FileCollectorTest.java | 4 +- .../datafile/tasks/PublishedCheckerTest.java | 62 ++-- .../datafile/tasks/ScheduledTasksTest.java | 61 ++-- .../src/test/resources/datafile_endpoints.json | 44 --- .../test/resources/datafile_endpoints_test.json | 31 ++ .../datafile_endpoints_test_2producers.json | 49 +++ 43 files changed, 1097 insertions(+), 955 deletions(-) delete mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java create mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java create mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java delete mode 100644 datafile-app-server/src/main/resources/datafile_endpoints.json delete mode 100644 datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java delete mode 100644 datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java delete mode 100644 datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java delete mode 100644 datafile-app-server/src/test/resources/datafile_endpoints.json create mode 100644 datafile-app-server/src/test/resources/datafile_endpoints_test.json create mode 100644 datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json (limited to 'datafile-app-server') diff --git a/datafile-app-server/config/datafile_endpoints.json b/datafile-app-server/config/datafile_endpoints.json index 833f1e91..cd1b502f 100644 --- a/datafile-app-server/config/datafile_endpoints.json +++ b/datafile-app-server/config/datafile_endpoints.json @@ -1,44 +1,33 @@ { - "configs": { - "dmaap": { - "dmaapConsumerConfiguration": { - "dmaapHostName": "localhost", - "dmaapPortNumber": 2222, - "dmaapTopicName": "/events/unauthenticated.VES_NOTIFICATION_OUTPUT", - "dmaapProtocol": "http", - "dmaapUserName": "", - "dmaapUserPassword": "", - "dmaapContentType": "application/json", - "consumerId": "C12", - "consumerGroup": "OpenDcae-c12", - "timeoutMs": -1, - "messageLimit": 1 - }, - "dmaapProducerConfiguration": { - "dmaapHostName": "localhost", - "dmaapPortNumber": 3907, - "dmaapTopicName": "publish", - "dmaapProtocol": "https", - "dmaapUserName": "dradmin", - "dmaapUserPassword": "dradmin", - "dmaapContentType": "application/octet-stream" - } - }, - "ftp": { - "ftpesConfiguration": { - "keyCert": "config/dfc.jks", - "keyPassword": "secret", - "trustedCa": "config/ftp.jks", - "trustedCaPassword": "secret" - } - }, - "security": { - "trustStorePath" : "change it", - "trustStorePasswordPath" : "change it", - "keyStorePath" : "change it", - "keyStorePasswordPath" : "change it", - "enableDmaapCertAuth" : "false" - } - } + "//description":"This file is only used for testing purposes", + "dmaap.ftpesConfig.keyCert":"/config/dfc.jks", + "dmaap.ftpesConfig.keyPassword":"secret", + "dmaap.ftpesConfig.trustedCa":"config/ftp.jks", + "dmaap.ftpesConfig.trustedCaPassword":"secret", + "dmaap.security.trustStorePath":"change it", + "dmaap.security.trustStorePasswordPath":"trustStorePasswordPath", + "dmaap.security.keyStorePath":"keyStorePath", + "dmaap.security.keyStorePasswordPath":"change it", + "dmaap.security.enableDmaapCertAuth":"false", + "dmaap.dmaapProducerConfiguration" : { + "changeIdentifier":"PM_MEAS_FILES", + "feedName":"feed00" + }, + "streams_subscribes":{ + "dmaap_subscriber":{ + "dmmap_info":{ + "topic_url":"http://dradmin:dradmin@localhost:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12" + }, + "type":"message_router" + } + }, + "feed00":{ + "username":"user", + "log_url":"https://localhost:3907/feedlog/1", + "publish_url":"https://localhost:3907/publish/1", + "location":"loc00", + "password":"dradmin", + "publisher_id":"972.360gm" + } } diff --git a/datafile-app-server/pom.xml b/datafile-app-server/pom.xml index 42bdd771..e3c60092 100644 --- a/datafile-app-server/pom.xml +++ b/datafile-app-server/pom.xml @@ -65,6 +65,19 @@ com.jcraft jsch + + org.springframework.boot + spring-boot-starter-actuator + + + javax.xml.bind + jaxb-api + + + org.springframework.boot + spring-boot-configuration-processor + true + diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java index a38eab8f..d324ca99 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java @@ -22,22 +22,38 @@ import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonSyntaxException; import com.google.gson.TypeAdapterFactory; + import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.time.Duration; +import java.util.Map; +import java.util.Properties; import java.util.ServiceLoader; + import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; + +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.CloudConfigurationProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.ComponentScan; import org.springframework.stereotype.Component; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + /** * Holds all configuration for the DFC. * @@ -46,105 +62,174 @@ import org.springframework.stereotype.Component; */ @Component +@ComponentScan("org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers") @EnableConfigurationProperties @ConfigurationProperties("app") public class AppConfig { - - private static final String CONFIG = "configs"; - private static final String DMAAP = "dmaap"; - private static final String DMAAP_PRODUCER = "dmaapProducerConfiguration"; - private static final String DMAAP_CONSUMER = "dmaapConsumerConfiguration"; - private static final String FTP = "ftp"; - private static final String FTPES_CONFIGURATION = "ftpesConfiguration"; - private static final String SECURITY = "security"; private static final Logger logger = LoggerFactory.getLogger(AppConfig.class); - private DmaapConsumerConfiguration dmaapConsumerConfiguration; - private DmaapPublisherConfiguration dmaapPublisherConfiguration; + private ConsumerConfiguration dmaapConsumerConfiguration; + private Map publishingConfiguration; private FtpesConfig ftpesConfiguration; + private CloudConfigurationProvider cloudConfigurationProvider; + @Value("#{systemEnvironment}") + Properties systemEnvironment; + private Disposable refreshConfigTask = null; @NotEmpty private String filepath; - public synchronized DmaapConsumerConfiguration getDmaapConsumerConfiguration() { + @Autowired + public synchronized void setCloudConfigurationProvider( + CloudConfigurationProvider reactiveCloudConfigurationProvider) { + this.cloudConfigurationProvider = reactiveCloudConfigurationProvider; + } + + public synchronized void setFilepath(String filepath) { + this.filepath = filepath; + } + + /** + * Reads the cloud configuration. + */ + public void initialize() { + stop(); + Map context = MappedDiagnosticContext.initializeTraceContext(); + loadConfigurationFromFile(); + + refreshConfigTask = Flux.interval(Duration.ZERO, Duration.ofMinutes(5)) + .flatMap(count -> createRefreshConfigurationTask(count, context)) + .subscribe(e -> logger.info("Refreshed configuration data"), + throwable -> logger.error("Configuration refresh terminated due to exception", throwable), + () -> logger.error("Configuration refresh terminated")); + } + + public void stop() { + if (refreshConfigTask != null) { + refreshConfigTask.dispose(); + refreshConfigTask = null; + } + } + + public synchronized ConsumerConfiguration getDmaapConsumerConfiguration() { return dmaapConsumerConfiguration; } - public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() { - return dmaapPublisherConfiguration; + public synchronized boolean isFeedConfigured(String changeIdentifier) { + return publishingConfiguration.containsKey(changeIdentifier); + } + + public synchronized PublisherConfiguration getPublisherConfiguration(String changeIdentifier) + throws DatafileTaskException { + + if (publishingConfiguration == null) { + throw new DatafileTaskException("No PublishingConfiguration loaded, changeIdentifier: " + changeIdentifier); + } + PublisherConfiguration cfg = publishingConfiguration.get(changeIdentifier); + if (cfg == null) { + throw new DatafileTaskException( + "Cannot find getPublishingConfiguration for changeIdentifier: " + changeIdentifier); + } + return cfg; } public synchronized FtpesConfig getFtpesConfiguration() { return ftpesConfiguration; } + Flux createRefreshConfigurationTask(Long counter, Map context) { + return Flux.just(counter) // + .doOnNext(cnt -> logger.debug("Refresh config {}", cnt)) // + .flatMap(cnt -> readEnvironmentVariables(systemEnvironment, context)) // + .flatMap(this::fetchConfiguration); + } + + Mono readEnvironmentVariables(Properties systemEnvironment, Map context) { + return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context) + .onErrorResume(AppConfig::onErrorResume); + } + + private static Mono onErrorResume(Throwable trowable) { + logger.error("Could not refresh application configuration {}", trowable.toString()); + return Mono.empty(); + } + + private Mono fetchConfiguration(EnvProperties env) { + Mono serviceCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(env) // + .onErrorResume(AppConfig::onErrorResume); + + // Note, have to use this callForServiceConfigurationReactive with EnvProperties, since the + // other ones does not work + EnvProperties dmaapEnv = ImmutableEnvProperties.builder() // + .consulHost(env.consulHost()) // + .consulPort(env.consulPort()) // + .cbsName(env.cbsName()) // + .appName(env.appName() + ":dmaap") // + .build(); // + Mono dmaapCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(dmaapEnv) + .onErrorResume(t -> Mono.just(new JsonObject())); + + return serviceCfg.zipWith(dmaapCfg, this::parseCloudConfig) // + .onErrorResume(AppConfig::onErrorResume); + } + + /** + * parse configuration + * + * @param serviceConfigRootObject + * @param dmaapConfigRootObject if there is no dmaapConfigRootObject, the dmaap feeds are taken + * from the serviceConfigRootObject + * @return this which is updated if successful + */ + private AppConfig parseCloudConfig(JsonObject serviceConfigRootObject, JsonObject dmaapConfigRootObject) { + try { + CloudConfigParser parser = new CloudConfigParser(serviceConfigRootObject, dmaapConfigRootObject); + setConfiguration(parser.getDmaapConsumerConfig(), parser.getDmaapPublisherConfig(), + parser.getFtpesConfig()); + } catch (DatafileTaskException e) { + logger.error("Could not parse configuration {}", e.toString(), e); + } + return this; + } + /** * Reads the configuration from file. */ - public void loadConfigurationFromFile() { + void loadConfigurationFromFile() { GsonBuilder gsonBuilder = new GsonBuilder(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); - JsonParser parser = new JsonParser(); - JsonObject jsonObject; + try (InputStream inputStream = createInputStream(filepath)) { - JsonElement rootElement = getJsonElement(parser, inputStream); - if (rootElement.isJsonObject()) { - jsonObject = rootElement.getAsJsonObject(); - FtpesConfig ftpesConfig = deserializeType(gsonBuilder, - jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(FTP).getAsJsonObject(FTPES_CONFIGURATION), - FtpesConfig.class); - DmaapConsumerConfiguration consumerConfiguration = deserializeType(gsonBuilder, - concatenateJsonObjects( - jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP) - .getAsJsonObject(DMAAP_CONSUMER), - rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)), - DmaapConsumerConfiguration.class); - - DmaapPublisherConfiguration publisherConfiguration = deserializeType(gsonBuilder, - concatenateJsonObjects( - jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP) - .getAsJsonObject(DMAAP_PRODUCER), - rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)), - DmaapPublisherConfiguration.class); - - setConfiguration(consumerConfiguration, publisherConfiguration, ftpesConfig); + JsonParser parser = new JsonParser(); + JsonObject rootObject = getJsonElement(parser, inputStream).getAsJsonObject(); + if (rootObject == null) { + throw new JsonSyntaxException("Root is not a json object"); } + parseCloudConfig(rootObject, rootObject); } catch (JsonSyntaxException | IOException e) { - logger.error("Problem with loading configuration, file: {}", filepath, e); + logger.warn("Local configuration file not loaded: {}", filepath, e); } } - synchronized void setConfiguration(DmaapConsumerConfiguration consumerConfiguration, - DmaapPublisherConfiguration publisherConfiguration, FtpesConfig ftpesConfig) { - this.dmaapConsumerConfiguration = consumerConfiguration; - this.dmaapPublisherConfiguration = publisherConfiguration; - this.ftpesConfiguration = ftpesConfig; + private synchronized void setConfiguration(ConsumerConfiguration consumerConfiguration, + Map publisherConfiguration, FtpesConfig ftpesConfig) { + if (consumerConfiguration == null || publisherConfiguration == null || ftpesConfig == null) { + logger.error( + "Problem with configuration consumerConfiguration: {}, publisherConfiguration: {}, ftpesConfig: {}", + consumerConfiguration, publisherConfiguration, ftpesConfig); + } else { + this.dmaapConsumerConfiguration = consumerConfiguration; + this.publishingConfiguration = publisherConfiguration; + this.ftpesConfiguration = ftpesConfig; + } } JsonElement getJsonElement(JsonParser parser, InputStream inputStream) { return parser.parse(new InputStreamReader(inputStream)); } - private T deserializeType(@NotNull GsonBuilder gsonBuilder, @NotNull JsonObject jsonObject, - @NotNull Class type) { - return gsonBuilder.create().fromJson(jsonObject, type); - } - InputStream createInputStream(@NotNull String filepath) throws IOException { return new BufferedInputStream(new FileInputStream(filepath)); } - synchronized String getFilepath() { - return this.filepath; - } - - public synchronized void setFilepath(String filepath) { - this.filepath = filepath; - } - - private JsonObject concatenateJsonObjects(JsonObject target, JsonObject source) { - source.entrySet().forEach(entry -> target.add(entry.getKey(), entry.getValue())); - return target; - } - } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java index 6b7860c4..3ac6b2c6 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java @@ -18,11 +18,17 @@ package org.onap.dcaegen2.collectors.datafile.configuration; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; import com.google.gson.JsonObject; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; /** @@ -32,63 +38,106 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.Immutabl * @author Henrik Andersson */ public class CloudConfigParser { - private static final String DMAAP_SECURITY_TRUST_STORE_PATH = "dmaap.security.trustStorePath"; private static final String DMAAP_SECURITY_TRUST_STORE_PASS_PATH = "dmaap.security.trustStorePasswordPath"; private static final String DMAAP_SECURITY_KEY_STORE_PATH = "dmaap.security.keyStorePath"; private static final String DMAAP_SECURITY_KEY_STORE_PASS_PATH = "dmaap.security.keyStorePasswordPath"; private static final String DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH = "dmaap.security.enableDmaapCertAuth"; - private final JsonObject jsonObject; + private final JsonObject serviceConfigurationRoot; + private final JsonObject dmaapConfigurationRoot; - CloudConfigParser(JsonObject jsonObject) { - this.jsonObject = jsonObject; + public CloudConfigParser(JsonObject serviceConfigurationRoot, JsonObject dmaapConfigurationRoot) { + this.serviceConfigurationRoot = serviceConfigurationRoot; + this.dmaapConfigurationRoot = dmaapConfigurationRoot; } - DmaapPublisherConfiguration getDmaapPublisherConfig() { - return new ImmutableDmaapPublisherConfiguration.Builder() - .dmaapTopicName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapTopicName").getAsString()) - .dmaapUserPassword(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserPassword").getAsString()) - .dmaapPortNumber(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapPortNumber").getAsInt()) - .dmaapProtocol(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapProtocol").getAsString()) - .dmaapContentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString()) - .dmaapHostName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapHostName").getAsString()) - .dmaapUserName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserName").getAsString()) - .trustStorePath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PATH).getAsString()) - .trustStorePasswordPath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PASS_PATH).getAsString()) - .keyStorePath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PATH).getAsString()) - .keyStorePasswordPath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PASS_PATH).getAsString()) - .enableDmaapCertAuth(jsonObject.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) // - .build(); + public Map getDmaapPublisherConfig() throws DatafileTaskException { + Iterator producerCfgs = + toArray(serviceConfigurationRoot.get("dmaap.dmaapProducerConfiguration")).iterator(); + + Map result = new HashMap<>(); + + while (producerCfgs.hasNext()) { + JsonObject producerCfg = producerCfgs.next().getAsJsonObject(); + String feedName = getAsString(producerCfg, "feedName"); + JsonObject feedConfig = getFeedConfig(feedName); + + PublisherConfiguration cfg = ImmutablePublisherConfiguration.builder() // + .publishUrl(getAsString(feedConfig, "publish_url")) // + .passWord(getAsString(feedConfig, "password")) // + .userName(getAsString(feedConfig, "username")) // + .trustStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PATH)) // + .trustStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) // + .keyStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PATH)) // + .keyStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) // + .enableDmaapCertAuth( + get(serviceConfigurationRoot, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) // + .changeIdentifier(getAsString(producerCfg, "changeIdentifier")) // + .logUrl(getAsString(feedConfig, "log_url")) // + .build(); + + result.put(cfg.changeIdentifier(), cfg); + } + return result; } - DmaapConsumerConfiguration getDmaapConsumerConfig() { - return new ImmutableDmaapConsumerConfiguration.Builder() - .timeoutMs(jsonObject.get("dmaap.dmaapConsumerConfiguration.timeoutMs").getAsInt()) - .dmaapHostName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapHostName").getAsString()) - .dmaapUserName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserName").getAsString()) - .dmaapUserPassword(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserPassword").getAsString()) - .dmaapTopicName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapTopicName").getAsString()) - .dmaapPortNumber(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapPortNumber").getAsInt()) - .dmaapContentType(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapContentType").getAsString()) - .messageLimit(jsonObject.get("dmaap.dmaapConsumerConfiguration.messageLimit").getAsInt()) - .dmaapProtocol(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapProtocol").getAsString()) - .consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString()) - .consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString()) - .trustStorePath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PATH).getAsString()) - .trustStorePasswordPath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PASS_PATH).getAsString()) - .keyStorePath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PATH).getAsString()) - .keyStorePasswordPath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PASS_PATH).getAsString()) - .enableDmaapCertAuth(jsonObject.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) // + public ConsumerConfiguration getDmaapConsumerConfig() throws DatafileTaskException { + JsonObject consumerCfg = serviceConfigurationRoot.get("streams_subscribes").getAsJsonObject(); + Set> topics = consumerCfg.entrySet(); + if (topics.size() != 1) { + throw new DatafileTaskException("Invalid configuration, number oftopic must be one, config: " + topics); + } + JsonObject topic = topics.iterator().next().getValue().getAsJsonObject(); + JsonObject dmaapInfo = get(topic, "dmmap_info").getAsJsonObject(); + String topicUrl = getAsString(dmaapInfo, "topic_url"); + + return ImmutableConsumerConfiguration.builder().topicUrl(topicUrl) + .trustStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PATH)) + .trustStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) + .keyStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PATH)) + .keyStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) + .enableDmaapCertAuth( + get(serviceConfigurationRoot, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) // .build(); } - FtpesConfig getFtpesConfig() { + public FtpesConfig getFtpesConfig() throws DatafileTaskException { return new ImmutableFtpesConfig.Builder() // - .keyCert(jsonObject.get("dmaap.ftpesConfig.keyCert").getAsString()) - .keyPassword(jsonObject.get("dmaap.ftpesConfig.keyPassword").getAsString()) - .trustedCa(jsonObject.get("dmaap.ftpesConfig.trustedCa").getAsString()) - .trustedCaPassword(jsonObject.get("dmaap.ftpesConfig.trustedCaPassword").getAsString()) // + .keyCert(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyCert")) + .keyPassword(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyPassword")) + .trustedCa(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.trustedCa")) + .trustedCaPassword(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.trustedCaPassword")) // .build(); } + + private static JsonElement get(JsonObject obj, String memberName) throws DatafileTaskException { + JsonElement elem = obj.get(memberName); + if (elem == null) { + throw new DatafileTaskException("Could not find member: " + memberName + " in: " + obj); + } + return elem; + } + + private static String getAsString(JsonObject obj, String memberName) throws DatafileTaskException { + return get(obj, memberName).getAsString(); + } + + private JsonObject getFeedConfig(String feedName) throws DatafileTaskException { + JsonElement elem = dmaapConfigurationRoot.get(feedName); + if (elem == null) { + elem = get(serviceConfigurationRoot, feedName); // Fallback, try to find it under + // serviceConfigurationRoot + } + return elem.getAsJsonObject(); + } + + private static JsonArray toArray(JsonElement obj) { + if (obj.isJsonArray()) { + return obj.getAsJsonArray(); + } + JsonArray arr = new JsonArray(); + arr.add(obj); + return arr; + } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java deleted file mode 100644 index 597f525f..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java +++ /dev/null @@ -1,110 +0,0 @@ -/*- - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.configuration; - -import com.google.gson.JsonObject; -import java.util.Map; -import java.util.Properties; -import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.ReactiveCloudConfigurationProvider; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.ComponentScan; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Primary; -import org.springframework.scheduling.annotation.EnableScheduling; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; - -/** - * Gets the DFC configuration from the ConfigBindingService/Consul and parses it to the configurations needed in DFC. - * - * @author Przemysław Wąsala on 9/19/18 - * @author Henrik Andersson - */ -@Configuration -@ComponentScan("org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers") -@EnableConfigurationProperties -@EnableScheduling -@Primary -public class CloudConfiguration extends AppConfig { - private static final Logger logger = LoggerFactory.getLogger(CloudConfiguration.class); - private ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider; - private DmaapPublisherConfiguration dmaapPublisherCloudConfiguration; - private DmaapConsumerConfiguration dmaapConsumerCloudConfiguration; - private FtpesConfig ftpesCloudConfiguration; - - @Value("#{systemEnvironment}") - private Properties systemEnvironment; - - @Autowired - public synchronized void setThreadPoolTaskScheduler( - ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider) { - this.reactiveCloudConfigurationProvider = reactiveCloudConfigurationProvider; - } - - /** - * Reads the cloud configuration. - */ - public void runTask() { - Map context = MappedDiagnosticContext.initializeTraceContext(); - EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context) // - .subscribeOn(Schedulers.parallel()) // - .flatMap(reactiveCloudConfigurationProvider::callForServiceConfigurationReactive) // - .flatMap(this::parseCloudConfig) // - .subscribe(null, this::onError, this::onComplete); - } - - private void onComplete() { - logger.trace("Configuration updated"); - } - - private void onError(Throwable throwable) { - logger.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable); - } - - private synchronized Mono parseCloudConfig(JsonObject jsonObject) { - logger.info("Received application configuration: {}", jsonObject); - CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject); - dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig(); - dmaapConsumerCloudConfiguration = cloudConfigParser.getDmaapConsumerConfig(); - ftpesCloudConfiguration = cloudConfigParser.getFtpesConfig(); - return Mono.just(this); - } - - @Override - public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() { - return dmaapPublisherCloudConfiguration != null ? dmaapPublisherCloudConfiguration - : super.getDmaapPublisherConfiguration(); - } - - @Override - public synchronized DmaapConsumerConfiguration getDmaapConsumerConfiguration() { - return dmaapConsumerCloudConfiguration != null ? dmaapConsumerCloudConfiguration - : super.getDmaapConsumerConfiguration(); - } - - @Override - public synchronized FtpesConfig getFtpesConfiguration() { - return ftpesCloudConfiguration != null ? ftpesCloudConfiguration : super.getFtpesConfiguration(); - } -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java new file mode 100644 index 00000000..fc9ab204 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java @@ -0,0 +1,105 @@ +/*- + * ============LICENSE_START====================================================================== + * Copyright (C) 2018,2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.configuration; + +import java.net.MalformedURLException; +import java.net.URL; + +import org.immutables.gson.Gson; +import org.immutables.value.Value; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration; + +@Value.Immutable +@Value.Style(redactedMask = "####") +@Gson.TypeAdapters +public abstract class ConsumerConfiguration { + @Value.Redacted + public abstract String topicUrl(); + + public abstract String trustStorePath(); + + public abstract String trustStorePasswordPath(); + + public abstract String keyStorePath(); + + public abstract String keyStorePasswordPath(); + + public abstract Boolean enableDmaapCertAuth(); + + public DmaapConsumerConfiguration toDmaap() throws DatafileTaskException { + try { + URL url = new URL(topicUrl()); + String passwd = ""; + String userName = ""; + if (url.getUserInfo() != null) { + String[] userInfo = url.getUserInfo().split(":"); + userName = userInfo[0]; + passwd = userInfo[1]; + } + String urlPath = url.getPath(); + DmaapConsumerUrlPath path = parseDmaapUrlPath(urlPath); + + return new ImmutableDmaapConsumerConfiguration.Builder() // + .dmaapContentType("application/json") // + .dmaapPortNumber(url.getPort()) // + .dmaapHostName(url.getHost()) // + .dmaapTopicName(path.dmaapTopicName) // + .dmaapProtocol(url.getProtocol()) // + .dmaapUserName(userName) // + .dmaapUserPassword(passwd) // + .trustStorePath(this.trustStorePath()) // + .trustStorePasswordPath(this.trustStorePasswordPath()) // + .keyStorePath(this.keyStorePath()) // + .keyStorePasswordPath(this.keyStorePasswordPath()) // + .enableDmaapCertAuth(this.enableDmaapCertAuth()) // + .consumerId(path.consumerId) // + .consumerGroup(path.consumerGroup) // + .timeoutMs(-1) // + .messageLimit(-1) // + .build(); + } catch (MalformedURLException e) { + throw new DatafileTaskException("Could not parse the URL", e); + } + } + + private class DmaapConsumerUrlPath { + final String dmaapTopicName; + final String consumerGroup; + final String consumerId; + + DmaapConsumerUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) { + this.dmaapTopicName = dmaapTopicName; + this.consumerGroup = consumerGroup; + this.consumerId = consumerId; + } + } + + private DmaapConsumerUrlPath parseDmaapUrlPath(String urlPath) throws DatafileTaskException { + String[] tokens = urlPath.split("/"); // UrlPath: /events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12 + if (tokens.length != 5) { + throw new DatafileTaskException("The path has incorrect syntax: " + urlPath); + } + + final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // ex. // /events/unauthenticated.VES_NOTIFICATION_OUTPUT + final String consumerGroup = tokens[3]; // ex. OpenDcae-c12 + final String consumerId = tokens[4]; // ex. C12 + return new DmaapConsumerUrlPath(dmaapTopicName, consumerGroup, consumerId); + } + +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java index 71003f80..62af92a8 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java @@ -19,12 +19,14 @@ package org.onap.dcaegen2.collectors.datafile.configuration; import java.util.Map; import java.util.Optional; import java.util.Properties; + import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; + import reactor.core.publisher.Mono; /** @@ -53,7 +55,7 @@ class EnvironmentProcessor { } catch (EnvironmentLoaderException e) { return Mono.error(e); } - logger.info("Evaluated environment system variables {}", envProperties); + logger.trace("Evaluated environment system variables {}", envProperties); return Mono.just(envProperties); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java index 3f029359..844699eb 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java @@ -21,6 +21,7 @@ package org.onap.dcaegen2.collectors.datafile.configuration; import java.io.Serializable; + import org.immutables.gson.Gson; import org.immutables.value.Value; import org.springframework.stereotype.Component; @@ -28,7 +29,7 @@ import org.springframework.stereotype.Component; @Component @Value.Immutable -@Value.Style(builder = "new") +@Value.Style(builder = "new", redactedMask = "####") @Gson.TypeAdapters public abstract class FtpesConfig implements Serializable { @@ -38,11 +39,13 @@ public abstract class FtpesConfig implements Serializable { public abstract String keyCert(); @Value.Parameter + @Value.Redacted public abstract String keyPassword(); @Value.Parameter public abstract String trustedCa(); @Value.Parameter + @Value.Redacted public abstract String trustedCaPassword(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java new file mode 100644 index 00000000..5576ed26 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java @@ -0,0 +1,74 @@ +/*- + * ============LICENSE_START====================================================================== + * Copyright (C) 2018,2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.configuration; + +import java.net.MalformedURLException; +import java.net.URL; + +import org.immutables.gson.Gson; +import org.immutables.value.Value; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; + + +@Value.Immutable +@Value.Style(redactedMask = "####") +@Gson.TypeAdapters +public interface PublisherConfiguration { + + String publishUrl(); + + String logUrl(); + + String userName(); + + @Value.Redacted + String passWord(); + + String trustStorePath(); + + String trustStorePasswordPath(); + + String keyStorePath(); + + String keyStorePasswordPath(); + + Boolean enableDmaapCertAuth(); + + String changeIdentifier(); + + default DmaapPublisherConfiguration toDmaap() throws MalformedURLException { + URL url = new URL(publishUrl()); + String urlPath = url.getPath(); + + return new ImmutableDmaapPublisherConfiguration.Builder() // + .dmaapContentType("application/octet-stream") // + .dmaapPortNumber(url.getPort()) // + .dmaapHostName(url.getHost()) // + .dmaapTopicName(urlPath) // + .dmaapProtocol(url.getProtocol()) // + .dmaapUserName(this.userName()) // + .dmaapUserPassword(this.passWord()) // + .trustStorePath(this.trustStorePath()) // + .trustStorePasswordPath(this.trustStorePasswordPath()) // + .keyStorePath(this.keyStorePath()) // + .keyStorePasswordPath(this.keyStorePasswordPath()) // + .enableDmaapCertAuth(this.enableDmaapCertAuth()) // + .build(); + } + +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java index b78e4ae5..5835de1a 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java @@ -16,7 +16,6 @@ package org.onap.dcaegen2.collectors.datafile.configuration; -import io.swagger.annotations.ApiOperation; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; @@ -24,7 +23,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledFuture; + import javax.annotation.PostConstruct; + import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks; import org.slf4j.Logger; @@ -36,6 +37,8 @@ import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableScheduling; + +import io.swagger.annotations.ApiOperation; import reactor.core.publisher.Mono; /** @@ -49,7 +52,6 @@ import reactor.core.publisher.Mono; public class SchedulerConfig { private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = Duration.ofSeconds(15); - private static final Duration SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = Duration.ofMinutes(5); private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE = Duration.ofHours(1); private static final Logger logger = LoggerFactory.getLogger(SchedulerConfig.class); private static List> scheduledFutureList = new ArrayList<>(); @@ -57,21 +59,21 @@ public class SchedulerConfig { private final TaskScheduler taskScheduler; private final ScheduledTasks scheduledTask; - private final CloudConfiguration cloudConfiguration; + private final AppConfig configuration; /** * Constructor. * * @param taskScheduler The scheduler used to schedule the tasks. * @param scheduledTasks The scheduler that will actually handle the tasks. - * @param cloudConfiguration The DFC configuration. + * @param configuration The DFC configuration. */ @Autowired public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTasks, - CloudConfiguration cloudConfiguration) { + AppConfig configuration) { this.taskScheduler = taskScheduler; this.scheduledTask = scheduledTasks; - this.cloudConfiguration = cloudConfiguration; + this.configuration = configuration; } /** @@ -83,6 +85,7 @@ public class SchedulerConfig { public synchronized Mono> getResponseFromCancellationOfTasks() { scheduledFutureList.forEach(x -> x.cancel(false)); scheduledFutureList.clear(); + configuration.stop(); MDC.setContextMap(contextMap); logger.info("Stopped Datafile workflow"); MDC.clear(); @@ -99,12 +102,11 @@ public class SchedulerConfig { public synchronized boolean tryToStartTask() { contextMap = MappedDiagnosticContext.initializeTraceContext(); logger.info("Start scheduling Datafile workflow"); + configuration.initialize(); + if (scheduledFutureList.isEmpty()) { - scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(cloudConfiguration::runTask, - Instant.now(), SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY)); - scheduledFutureList.add( - taskScheduler.scheduleWithFixedDelay(scheduledTask::executeDatafileMainTask, - SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS)); + scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(scheduledTask::executeDatafileMainTask, + SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS)); scheduledFutureList .add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()), SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE)); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java index 71242265..5a78261a 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java @@ -62,7 +62,7 @@ public class SwaggerConfig extends WebMvcConfigurationSupport { .build(); } - private ApiInfo apiInfo() { + private static ApiInfo apiInfo() { return new ApiInfoBuilder() // .title(API_TITLE) // .description(DESCRIPTION) // diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java index cbd67297..847ca46e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java @@ -41,7 +41,7 @@ public class TomcatHttpConfig { return tomcat; } - private Connector getHttpConnector() { + private static Connector getHttpConnector() { Connector connector = new Connector(TomcatServletWebServerFactory.DEFAULT_PROTOCOL); connector.setScheme("http"); connector.setPort(8100); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java index 4716fa87..791f0cf1 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java @@ -71,7 +71,7 @@ public class ScheduleController { public Mono> startTasks() { return Mono.fromSupplier(schedulerConfig::tryToStartTask) // - .map(this::createStartTaskResponse); + .map(ScheduleController::createStartTaskResponse); } /** @@ -90,7 +90,7 @@ public class ScheduleController { } @ApiOperation(value = "Sends success or error response on starting task execution") - private ResponseEntity createStartTaskResponse(boolean wasScheduled) { + private static ResponseEntity createStartTaskResponse(boolean wasScheduled) { if (wasScheduled) { return new ResponseEntity<>("Datafile Service has been started!", HttpStatus.CREATED); } else { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java index 4c49dd8a..72623db2 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java @@ -17,6 +17,7 @@ package org.onap.dcaegen2.collectors.datafile.ftp; import java.util.Optional; + import org.immutables.value.Value; /** @@ -26,11 +27,13 @@ import org.immutables.value.Value; * */ @Value.Immutable +@Value.Style(redactedMask = "####") public interface FileServerData { public String serverAddress(); public String userId(); + @Value.Redacted public String password(); public Optional port(); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java index c78ae3a3..bb3016ce 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java @@ -128,7 +128,7 @@ public class FtpsClient implements FileCollectClient { logger.trace("collectFile fetched: {}", localFileName); } - private int getPort(Optional port) { + private static int getPort(Optional port) { return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT; } @@ -180,7 +180,7 @@ public class FtpsClient implements FileCollectClient { logger.warn("Local file {} already created", localFileName); } OutputStream output = new FileOutputStream(localFile); - logger.debug("File {} opened xNF", localFileName); + logger.trace("File {} opened xNF", localFileName); return output; } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java index 333be92a..bdaf6d4c 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java @@ -55,7 +55,7 @@ public class SftpClient implements FileCollectClient { try { sftpChannel.get(remoteFile, localFile.toString()); - logger.debug("File {} Download Successfull from xNF", localFile.getFileName()); + logger.trace("File {} Download Successfull from xNF", localFile.getFileName()); } catch (SftpException e) { boolean retry = e.id != ChannelSftp.SSH_FX_NO_SUCH_FILE && e.id != ChannelSftp.SSH_FX_PERMISSION_DENIED && e.id != ChannelSftp.SSH_FX_OP_UNSUPPORTED; throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e, retry); @@ -90,11 +90,11 @@ public class SftpClient implements FileCollectClient { } } - private int getPort(Optional port) { + private static int getPort(Optional port) { return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT; } - private Session setUpSession(FileServerData fileServerData) throws JSchException { + private static Session setUpSession(FileServerData fileServerData) throws JSchException { JSch jsch = new JSch(); Session newSession = @@ -105,7 +105,7 @@ public class SftpClient implements FileCollectClient { return newSession; } - private ChannelSftp getChannel(Session session) throws JSchException { + private static ChannelSftp getChannel(Session session) throws JSchException { Channel channel = session.openChannel("sftp"); channel.connect(); return (ChannelSftp) channel; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java index 0a6b669c..36aae949 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java @@ -38,6 +38,7 @@ import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; */ @Value.Immutable @Gson.TypeAdapters +@Value.Style(redactedMask = "####") public abstract class FileData { public static final String DATAFILE_TMPDIR = "/tmp/onap_datafile/"; @@ -53,6 +54,7 @@ public abstract class FileData { * * @return the URL to use to fetch the file from the PNF */ + @Value.Redacted public abstract String location(); /** @@ -123,7 +125,7 @@ public abstract class FileData { * @return An Optional containing a String array with the user name and password if given, or an empty * Optional if not given. */ - private Optional getUserNameAndPasswordIfGiven(String userInfoString) { + private static Optional getUserNameAndPasswordIfGiven(String userInfoString) { if (userInfoString != null) { String[] userAndPassword = userInfoString.split(":"); if (userAndPassword.length == 2) { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java index 63ed0daa..5b8c015e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java @@ -33,6 +33,7 @@ import org.immutables.value.Value; @Value.Immutable @Gson.TypeAdapters +@Value.Style(redactedMask = "####") public interface FilePublishInformation { @SerializedName("productName") @@ -54,6 +55,7 @@ public interface FilePublishInformation { String getTimeZoneOffset(); @SerializedName("location") + @Value.Redacted String getLocation(); @SerializedName("compression") @@ -70,4 +72,6 @@ public interface FilePublishInformation { String getName(); Map getContext(); + + String getChangeIdentifier(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java index 7081d1ac..b8df125b 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java @@ -31,12 +31,12 @@ import java.util.Set; */ public abstract class JsonSerializer { + private JsonSerializer() {} - private static Gson gson = - new GsonBuilder() // - .serializeNulls() // - .addSerializationExclusionStrategy(new FilePublishInformationExclusionStrategy()) // - .create(); // + private static Gson gson = new GsonBuilder() // + .serializeNulls() // + .addSerializationExclusionStrategy(new FilePublishInformationExclusionStrategy()) // + .create(); // /** * Serializes a filePublishInformation. @@ -56,9 +56,10 @@ public abstract class JsonSerializer { private final Set inclusions = Sets.newHashSet("productName", "vendorName", "lastEpochMicrosec", "sourceName", "startEpochMicrosec", "timeZoneOffset", "location", "compression", "fileFormatType", "fileFormatVersion"); + @Override - public boolean shouldSkipField(FieldAttributes f) { - return !inclusions.contains(f.getName()); + public boolean shouldSkipField(FieldAttributes fieldAttributes) { + return !inclusions.contains(fieldAttributes.getName()); } @Override diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java index 3a3eb3aa..470c4e73 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java @@ -22,11 +22,13 @@ import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; + import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.stream.StreamSupport; + import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; @@ -37,6 +39,7 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.StringUtils; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -70,7 +73,6 @@ public class JsonMessageParser { private static final String FILE_FORMAT_VERSION = "fileFormatVersion"; private static final String FILE_READY_CHANGE_TYPE = "FileReady"; - private static final String FILE_READY_CHANGE_IDENTIFIER = "PM_MEAS_FILES"; /** * The data types available in the event name. @@ -92,7 +94,7 @@ public class JsonMessageParser { * @return a Flux containing messages. */ public Flux getMessagesFromJson(Mono rawMessage) { - return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createMessageData); + return rawMessage.flatMapMany(JsonMessageParser::getJsonParserMessage).flatMap(this::createMessageData); } Optional getJsonObjectFromAnArray(JsonElement element) { @@ -123,18 +125,17 @@ public class JsonMessageParser { : getMessagesFromJsonArray(jsonElement); } - private Mono getJsonParserMessage(String message) { - logger.trace("original message from message router: {}", message); + private static Mono getJsonParserMessage(String message) { return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message)); } - private Flux createMessages(Flux jsonObject) { + private static Flux createMessages(Flux jsonObject) { return jsonObject.flatMap(monoJsonP -> containsNotificationFields(monoJsonP) ? transformMessages(monoJsonP) : logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject)); } - private Mono transformMessages(JsonObject message) { + private static Mono transformMessages(JsonObject message) { Optional optionalMessageMetaData = getMessageMetaData(message); if (optionalMessageMetaData.isPresent()) { MessageMetaData messageMetaData = optionalMessageMetaData.get(); @@ -159,7 +160,7 @@ public class JsonMessageParser { } - private Optional getMessageMetaData(JsonObject message) { + private static Optional getMessageMetaData(JsonObject message) { List missingValues = new ArrayList<>(); JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER); String eventName = getValueFromJson(commonEventHeader, EVENT_NAME, missingValues); @@ -182,15 +183,15 @@ public class JsonMessageParser { .changeIdentifier(changeIdentifier) // .changeType(changeType) // .build(); - if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) { + if (missingValues.isEmpty() && isChangeTypeCorrect(changeType)) { return Optional.of(messageMetaData); } else { String errorMessage = "VES event parsing."; if (!missingValues.isEmpty()) { errorMessage += " Missing data: " + missingValues; } - if (!isChangeIdentifierCorrect(changeIdentifier) || !isChangeTypeCorrect(changeType)) { - errorMessage += " Change identifier or change type is wrong."; + if (!isChangeTypeCorrect(changeType)) { + errorMessage += " Change type is wrong: " + changeType + " expected: " + FILE_READY_CHANGE_TYPE; } errorMessage += " Message: {}"; logger.error(errorMessage, message); @@ -198,15 +199,12 @@ public class JsonMessageParser { } } - private boolean isChangeTypeCorrect(String changeType) { + private static boolean isChangeTypeCorrect(String changeType) { return FILE_READY_CHANGE_TYPE.equals(changeType); } - private boolean isChangeIdentifierCorrect(String changeIdentifier) { - return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier); - } - - private List getAllFileDataFromJson(JsonArray arrayOfAdditionalFields, MessageMetaData messageMetaData) { + private static List getAllFileDataFromJson(JsonArray arrayOfAdditionalFields, + MessageMetaData messageMetaData) { List res = new ArrayList<>(); for (int i = 0; i < arrayOfAdditionalFields.size(); i++) { JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i); @@ -219,7 +217,7 @@ public class JsonMessageParser { return res; } - private Optional getFileDataFromJson(JsonObject fileInfo, MessageMetaData messageMetaData) { + private static Optional getFileDataFromJson(JsonObject fileInfo, MessageMetaData messageMetaData) { logger.trace("starting to getFileDataFromJson!"); List missingValues = new ArrayList<>(); @@ -250,15 +248,17 @@ public class JsonMessageParser { } /** - * Gets data from the event name. Defined as: {DomainAbbreviation}_{productName}-{vendorName}_{Description}, - * example: Noti_RnNode-Ericsson_FileReady + * Gets data from the event name. Defined as: + * {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example: + * Noti_RnNode-Ericsson_FileReady * * @param dataType The type of data to get, {@link DmaapConsumerJsonParser.EventNameDataType}. * @param eventName The event name to get the data from. * @param missingValues List of missing values. The dataType will be added if missing. * @return String of data from event name */ - private String getDataFromEventName(EventNameDataType dataType, String eventName, List missingValues) { + private static String getDataFromEventName(EventNameDataType dataType, String eventName, + List missingValues) { String[] eventArray = eventName.split("_|-"); if (eventArray.length >= 4) { return eventArray[dataType.index]; @@ -269,7 +269,7 @@ public class JsonMessageParser { return ""; } - private String getValueFromJson(JsonObject jsonObject, String jsonKey, List missingValues) { + private static String getValueFromJson(JsonObject jsonObject, String jsonKey, List missingValues) { if (jsonObject.has(jsonKey)) { return jsonObject.get(jsonKey).getAsString(); } else { @@ -278,11 +278,11 @@ public class JsonMessageParser { } } - private boolean containsNotificationFields(JsonObject jsonObject) { + private static boolean containsNotificationFields(JsonObject jsonObject) { return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS); } - private Flux logErrorAndReturnEmptyMessageFlux(String errorMessage) { + private static Flux logErrorAndReturnEmptyMessageFlux(String errorMessage) { logger.error(errorMessage); return Flux.empty(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java index 2b3a0ef6..ff267815 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java @@ -67,7 +67,7 @@ public class PublishedFileCache { return publishedFiles.size(); } - private boolean isCachedPublishedFileOutdated(Instant now, Instant then) { + private static boolean isCachedPublishedFileOutdated(Instant now, Instant then) { final int timeToKeepInfoInSeconds = 60 * 60 * 24; return now.getEpochSecond() - then.getEpochSecond() > timeToKeepInfoInSeconds; } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java index 8d433827..198c1bf1 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java @@ -27,7 +27,9 @@ import java.security.NoSuchAlgorithmException; import java.time.Duration; import java.util.Map; import java.util.concurrent.Future; + import javax.net.ssl.SSLContext; + import org.apache.commons.codec.binary.Base64; import org.apache.http.HttpResponse; import org.apache.http.client.config.RequestConfig; @@ -44,8 +46,6 @@ import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.slf4j.Marker; import org.slf4j.MarkerFactory; -import org.springframework.web.util.DefaultUriBuilderFactory; -import org.springframework.web.util.UriBuilder; /** * Client used to send requests to DataRouter. @@ -139,22 +139,8 @@ public class DmaapProducerHttpClient { request.addHeader("Authorization", "Basic " + base64Creds); } - /** - * Gets a UriBuilder containing the base URI needed talk to DataRouter. Specific parts can then be - * added to the URI by the user. - * - * @return a UriBuilder containing the base URI needed talk to DataRouter. - */ - public UriBuilder getBaseUri() { - return new DefaultUriBuilderFactory().builder() // - .scheme(configuration.dmaapProtocol()) // - .host(configuration.dmaapHostName()) // - .port(configuration.dmaapPortNumber()); - } - private CloseableHttpAsyncClient createWebClient(boolean expectRedirect, Duration requestTimeout, - Map contextMap) - throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException { + Map contextMap) throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException { SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build(); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java index e50ef580..f1d33454 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java @@ -21,6 +21,7 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; import org.onap.dcaegen2.collectors.datafile.service.DmaapWebClient; import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser; @@ -29,6 +30,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consume import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.reactive.function.client.WebClient; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -43,7 +45,7 @@ public class DMaaPMessageConsumer { private final JsonMessageParser jsonMessageParser; private final DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient; - public DMaaPMessageConsumer(AppConfig datafileAppConfig) { + public DMaaPMessageConsumer(AppConfig datafileAppConfig) throws DatafileTaskException { this.jsonMessageParser = new JsonMessageParser(); this.dmaaPConsumerReactiveHttpClient = createHttpClient(datafileAppConfig); } @@ -69,8 +71,9 @@ public class DMaaPMessageConsumer { return jsonMessageParser.getMessagesFromJson(message); } - private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig) { - DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration(); + private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig) + throws DatafileTaskException { + DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration().toDmaap(); WebClient client = new DmaapWebClient().fromConfiguration(config).build(); return new DMaaPConsumerReactiveHttpClient(config, client); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java index e5dd01e9..1d6baa65 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -25,6 +25,7 @@ import com.google.gson.JsonParser; import java.io.IOException; import java.io.InputStream; +import java.net.MalformedURLException; import java.net.URI; import java.nio.file.Path; import java.time.Duration; @@ -34,6 +35,8 @@ import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpPut; import org.apache.http.entity.ByteArrayEntity; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.JsonSerializer; import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; @@ -46,6 +49,7 @@ import org.slf4j.MDC; import org.springframework.core.io.FileSystemResource; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; +import org.springframework.web.util.DefaultUriBuilderFactory; import reactor.core.publisher.Mono; @@ -58,12 +62,9 @@ import reactor.core.publisher.Mono; public class DataRouterPublisher { private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META"; private static final String CONTENT_TYPE = "application/octet-stream"; - private static final String PUBLISH_TOPIC = "publish"; - private static final String DEFAULT_FEED_ID = "1"; private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class); private final AppConfig datafileAppConfig; - private DmaapProducerHttpClient dmaapProducerReactiveHttpClient; public DataRouterPublisher(AppConfig datafileAppConfig) { this.datafileAppConfig = datafileAppConfig; @@ -80,7 +81,6 @@ public class DataRouterPublisher { public Mono publishFile(FilePublishInformation publishInfo, long numRetries, Duration firstBackoff) { MDC.setContextMap(publishInfo.getContext()); - dmaapProducerReactiveHttpClient = resolveClient(); return Mono.just(publishInfo) // .cache() // .flatMap(this::publishFile) // @@ -92,13 +92,14 @@ public class DataRouterPublisher { MDC.setContextMap(publishInfo.getContext()); logger.trace("Entering publishFile with {}", publishInfo); try { + DmaapProducerHttpClient dmaapProducerHttpClient = resolveClient(publishInfo.getChangeIdentifier()); HttpPut put = new HttpPut(); prepareHead(publishInfo, put); prepareBody(publishInfo, put); - dmaapProducerReactiveHttpClient.addUserCredentialsToHead(put); + dmaapProducerHttpClient.addUserCredentialsToHead(put); HttpResponse response = - dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext()); + dmaapProducerHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext()); logger.trace("{}", response); return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode())); } catch (Exception e) { @@ -107,11 +108,18 @@ public class DataRouterPublisher { } } - private void prepareHead(FilePublishInformation publishInfo, HttpPut put) { + private void prepareHead(FilePublishInformation publishInfo, HttpPut put) throws DatafileTaskException { + put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE); JsonElement metaData = new JsonParser().parse(JsonSerializer.createJsonBodyForDataRouter(publishInfo)); put.addHeader(X_DMAAP_DR_META, metaData.toString()); - put.setURI(getPublishUri(publishInfo.getName())); + URI uri = new DefaultUriBuilderFactory( + datafileAppConfig.getPublisherConfiguration(publishInfo.getChangeIdentifier()).publishUrl()) // + .builder() // + .pathSegment(publishInfo.getName()) // + .build(); + put.setURI(uri); + MappedDiagnosticContext.appendTraceInfo(put); } @@ -122,14 +130,7 @@ public class DataRouterPublisher { } } - private URI getPublishUri(String fileName) { - return dmaapProducerReactiveHttpClient.getBaseUri() // - .pathSegment(PUBLISH_TOPIC) // - .pathSegment(DEFAULT_FEED_ID) // - .pathSegment(fileName).build(); - } - - private Mono handleHttpResponse(HttpStatus response, FilePublishInformation publishInfo) { + private static Mono handleHttpResponse(HttpStatus response, FilePublishInformation publishInfo) { MDC.setContextMap(publishInfo.getContext()); if (HttpUtils.isSuccessfulResponseCode(response.value())) { logger.trace("Publish to DR successful!"); @@ -145,11 +146,17 @@ public class DataRouterPublisher { return realResource.getInputStream(); } - DmaapPublisherConfiguration resolveConfiguration() { - return datafileAppConfig.getDmaapPublisherConfiguration(); + PublisherConfiguration resolveConfiguration(String changeIdentifer) throws DatafileTaskException { + return datafileAppConfig.getPublisherConfiguration(changeIdentifer); } - DmaapProducerHttpClient resolveClient() { - return new DmaapProducerHttpClient(resolveConfiguration()); + DmaapProducerHttpClient resolveClient(String changeIdentifier) throws DatafileTaskException { + try { + DmaapPublisherConfiguration cfg = resolveConfiguration(changeIdentifier).toDmaap(); + return new DmaapProducerHttpClient(cfg); + } catch (MalformedURLException e) { + throw new DatafileTaskException("Cannot resolve producer client", e); + } + } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java index 0c62795e..6ddcb541 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -74,12 +74,12 @@ public class FileCollector { return Mono.just(fileData) // .cache() // - .flatMap(fd -> collectFile(fileData, contextMap)) // + .flatMap(fd -> tryCollectFile(fileData, contextMap)) // .retryBackoff(numRetries, firstBackoff) // - .flatMap(this::checkCollectedFile); + .flatMap(FileCollector::checkCollectedFile); } - private Mono checkCollectedFile(Optional info) { + private static Mono checkCollectedFile(Optional info) { if (info.isPresent()) { return Mono.just(info.get()); } else { @@ -88,7 +88,7 @@ public class FileCollector { } } - private Mono> collectFile(FileData fileData, Map context) { + private Mono> tryCollectFile(FileData fileData, Map context) { MDC.setContextMap(context); logger.trace("starting to collectFile {}", fileData.name()); @@ -110,7 +110,7 @@ public class FileCollector { } } catch (Exception throwable) { logger.warn("Failed to close ftp client: {} {}, reason: {}", fileData.sourceName(), fileData.name(), - throwable.toString()); + throwable.toString(), throwable); return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context))); } } @@ -126,7 +126,7 @@ public class FileCollector { } } - private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile, + private static FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile, Map context) { String location = fileData.location(); MessageMetaData metaData = fileData.messageMetaData(); @@ -143,6 +143,7 @@ public class FileCollector { .compression(fileData.compression()) // .fileFormatType(fileData.fileFormatType()) // .fileFormatVersion(fileData.fileFormatVersion()) // + .changeIdentifier(fileData.messageMetaData().changeIdentifier()) // .context(context) // .build(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java index e18da248..4d8d679d 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java @@ -21,18 +21,23 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import java.io.InputStream; +import java.net.MalformedURLException; import java.net.URI; +import java.net.URISyntaxException; import java.time.Duration; import java.util.Map; + import org.apache.commons.io.IOUtils; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.utils.URIBuilder; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -44,12 +49,9 @@ import org.slf4j.MDC; * */ public class PublishedChecker { - private static final String FEEDLOG_TOPIC = "feedlog"; - private static final String DEFAULT_FEED_ID = "1"; - private static final Duration WEB_CLIENT_TIMEOUT = Duration.ofSeconds(4); + private static final Duration WEB_CLIENT_TIMEOUT = Duration.ofSeconds(4); private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private final AppConfig appConfig; /** @@ -66,19 +68,24 @@ public class PublishedChecker { * * @param fileName the name of the file used when it is published. * - * @return true if the file has been published before, false otherwise. + * @return true if the file has been published before, false + * otherwise. + * @throws DatafileTaskException if the check fails */ - public boolean isFilePublished(String fileName, Map contextMap) { + public boolean isFilePublished(String fileName, String changeIdentifier, Map contextMap) + throws DatafileTaskException { MDC.setContextMap(contextMap); - DmaapProducerHttpClient producerClient = resolveClient(); + PublisherConfiguration publisherConfig = resolveConfiguration(changeIdentifier); + + DmaapProducerHttpClient producerClient = resolveClient(publisherConfig); HttpGet getRequest = new HttpGet(); MappedDiagnosticContext.appendTraceInfo(getRequest); - getRequest.setURI(getPublishedQueryUri(fileName, producerClient)); - producerClient.addUserCredentialsToHead(getRequest); - try { + getRequest.setURI(getPublishedQueryUri(fileName, publisherConfig)); + producerClient.addUserCredentialsToHead(getRequest); + HttpResponse response = producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest, WEB_CLIENT_TIMEOUT, contextMap); @@ -95,20 +102,23 @@ public class PublishedChecker { } } - private URI getPublishedQueryUri(String fileName, DmaapProducerHttpClient producerClient) { - return producerClient.getBaseUri() // - .pathSegment(FEEDLOG_TOPIC) // - .pathSegment(DEFAULT_FEED_ID) // - .queryParam("type", "pub") // - .queryParam("filename", fileName) // + private static URI getPublishedQueryUri(String fileName, PublisherConfiguration config) throws URISyntaxException { + return new URIBuilder(config.logUrl()) // + .addParameter("type", "pub") // + .addParameter("filename", fileName) // .build(); } - protected DmaapPublisherConfiguration resolveConfiguration() { - return appConfig.getDmaapPublisherConfiguration(); + protected PublisherConfiguration resolveConfiguration(String changeIdentifier) throws DatafileTaskException { + return appConfig.getPublisherConfiguration(changeIdentifier); } - protected DmaapProducerHttpClient resolveClient() { - return new DmaapProducerHttpClient(resolveConfiguration()); + protected DmaapProducerHttpClient resolveClient(PublisherConfiguration publisherConfig) + throws DatafileTaskException { + try { + return new DmaapProducerHttpClient(publisherConfig.toDmaap()); + } catch (MalformedURLException e) { + throw new DatafileTaskException("Cannot create published checker client", e); + } } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index b5fa0c24..bac52659 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; @@ -86,13 +87,16 @@ public class ScheduledTasks { threadPoolQueueSize.get()); return; } + if (this.applicationConfiguration.getDmaapConsumerConfiguration() == null) { + logger.warn("No configuration loaded, skipping polling for messages"); + return; + } currentNumberOfSubscriptions.incrementAndGet(); Map context = MappedDiagnosticContext.initializeTraceContext(); logger.trace("Execution of tasks was registered"); - applicationConfiguration.loadConfigurationFromFile(); createMainTask(context) // - .subscribe(this::onSuccess, // + .subscribe(ScheduledTasks::onSuccess, // throwable -> { onError(throwable, context); currentNumberOfSubscriptions.decrementAndGet(); @@ -115,6 +119,7 @@ public class ScheduledTasks { .flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) // .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) // .flatMap(fileData -> createMdcContext(fileData, context)) // + .filter(this::isFeedConfigured) // .filter(this::shouldBePublished) // .flatMap(this::fetchFile, false, 1, 1) // .flatMap(this::publishToDataRouter, false, 1, 1) // @@ -124,13 +129,13 @@ public class ScheduledTasks { } private class FileDataWithContext { - FileDataWithContext(FileData fileData, Map context) { + public final FileData fileData; + public final Map context; + + public FileDataWithContext(FileData fileData, Map context) { this.fileData = fileData; this.context = context; } - - final FileData fileData; - final Map context; } /** @@ -160,7 +165,7 @@ public class ScheduledTasks { return this.threadPoolQueueSize.get(); } - protected DMaaPMessageConsumer createConsumerTask() { + protected DMaaPMessageConsumer createConsumerTask() throws DatafileTaskException { return new DMaaPMessageConsumer(this.applicationConfiguration); } @@ -172,17 +177,17 @@ public class ScheduledTasks { return new DataRouterPublisher(applicationConfiguration); } - private void onComplete(Map contextMap) { + private static void onComplete(Map contextMap) { MDC.setContextMap(contextMap); logger.trace("Datafile tasks have been completed"); } - private synchronized void onSuccess(FilePublishInformation publishInfo) { + private static synchronized void onSuccess(FilePublishInformation publishInfo) { MDC.setContextMap(publishInfo.getContext()); logger.info("Datafile file published {}", publishInfo.getInternalLocation()); } - private void onError(Throwable throwable, Map context) { + private static void onError(Throwable throwable, Map context) { MDC.setContextMap(context); logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable.toString()); } @@ -194,11 +199,26 @@ public class ScheduledTasks { return Mono.just(pair); } + private boolean isFeedConfigured(FileDataWithContext fileData) { + if (applicationConfiguration.isFeedConfigured(fileData.fileData.messageMetaData().changeIdentifier())) { + return true; + } else { + logger.info("No feed is configured for: {}, file ignored: {}", + fileData.fileData.messageMetaData().changeIdentifier(), fileData.fileData.name()); + return false; + } + } + private boolean shouldBePublished(FileDataWithContext fileData) { boolean result = false; Path localFilePath = fileData.fileData.getLocalFilePath(); if (publishedFilesCache.put(localFilePath) == null) { - result = !createPublishedChecker().isFilePublished(fileData.fileData.name(), fileData.context); + try { + result = !createPublishedChecker().isFilePublished(fileData.fileData.name(), + fileData.fileData.messageMetaData().changeIdentifier(), fileData.context); + } catch (DatafileTaskException e) { + logger.error("Cannot check if a file {} is published", fileData.fileData.name(), e); + } } if (!result) { currentNumberOfTasks.decrementAndGet(); @@ -248,13 +268,19 @@ public class ScheduledTasks { */ private Flux fetchMoreFileReadyMessages() { logger.info( - "Consuming new file ready messages, current number of tasks: {}, published files: {}, number of subscrptions: {}", + "Consuming new file ready messages, current number of tasks: {}, published files: {}, " + + "number of subscriptions: {}", getCurrentNumberOfTasks(), publishedFilesCache.size(), this.currentNumberOfSubscriptions.get()); Map context = MDC.getCopyOfContextMap(); - return createConsumerTask() // - .getMessageRouterResponse() // - .onErrorResume(exception -> handleConsumeMessageFailure(exception, context)); + try { + return createConsumerTask() // + .getMessageRouterResponse() // + .onErrorResume(exception -> handleConsumeMessageFailure(exception, context)); + } catch (Exception e) { + logger.error("Could not create message consumer task", e); + return Flux.empty(); + } } private Flux handleConsumeMessageFailure(Throwable exception, Map context) { @@ -264,7 +290,7 @@ public class ScheduledTasks { return Flux.empty(); } - private void deleteFile(Path localFile, Map context) { + private static void deleteFile(Path localFile, Map context) { MDC.setContextMap(context); logger.trace("Deleting file: {}", localFile); try { diff --git a/datafile-app-server/src/main/resources/datafile_endpoints.json b/datafile-app-server/src/main/resources/datafile_endpoints.json deleted file mode 100644 index 8d45bc84..00000000 --- a/datafile-app-server/src/main/resources/datafile_endpoints.json +++ /dev/null @@ -1,37 +0,0 @@ -{ - "configs": { - "dmaap": { - "dmaapConsumerConfiguration": { - "dmaapHostName": "localhost", - "dmaapPortNumber": 2222, - "dmaapTopicName": "/events/unauthenticated.VES_NOTIFICATION_OUTPUT", - "dmaapProtocol": "http", - "dmaapUserName": "", - "dmaapUserPassword": "", - "dmaapContentType": "application/json", - "consumerId": "C12", - "consumerGroup": "OpenDcae-c12", - "timeoutMs": -1, - "messageLimit": 1 - }, - "dmaapProducerConfiguration": { - "dmaapHostName": "localhost", - "dmaapPortNumber": 3907, - "dmaapTopicName": "publish", - "dmaapProtocol": "https", - "dmaapUserName": "dradmin", - "dmaapUserPassword": "dradmin", - "dmaapContentType": "application/octet-stream" - } - }, - "ftp": { - "ftpesConfiguration": { - "keyCert": "config/dfc.jks", - "keyPassword": "secret", - "trustedCa": "config/ftp.jks", - "trustedCaPassword": "secret" - } - } - } -} - diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java index 5be75ab3..b1148a6a 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java @@ -16,25 +16,51 @@ package org.onap.dcaegen2.collectors.datafile.configuration; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; + +import com.google.common.base.Charsets; +import com.google.common.io.Resources; import com.google.gson.JsonElement; +import com.google.gson.JsonIOException; import com.google.gson.JsonObject; import com.google.gson.JsonParser; +import com.google.gson.JsonSyntaxException; + import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URL; import java.nio.charset.StandardCharsets; -import java.util.Objects; +import java.util.Map; +import java.util.Properties; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; +import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.CloudConfigurationProvider; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.read.ListAppender; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; /** * Tests the AppConfig. @@ -44,167 +70,285 @@ import org.junit.jupiter.api.Test; */ class AppConfigTest { - private static final String DATAFILE_ENDPOINTS = "datafile_endpoints.json"; - private static final boolean CORRECT_JSON = true; - private static final boolean INCORRECT_JSON = false; + private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES"; + + + private static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = // + new ImmutableDmaapConsumerConfiguration.Builder() // + .timeoutMs(-1) // + .dmaapHostName("message-router.onap.svc.cluster.local") // + .dmaapUserName("admin") // + .dmaapUserPassword("admin") // + .dmaapTopicName("events/unauthenticated.VES_NOTIFICATION_OUTPUT") // + .dmaapPortNumber(2222) // + .dmaapContentType("application/json") // + .messageLimit(-1) // + .dmaapProtocol("http") // + .consumerId("C12") // + .consumerGroup("OpenDcae-c12") // + .trustStorePath("trustStorePath") // + .trustStorePasswordPath("trustStorePasswordPath") // + .keyStorePath("keyStorePath") // + .keyStorePasswordPath("keyStorePasswordPath") // + .enableDmaapCertAuth(true) // + .build(); + + private static final ConsumerConfiguration CORRECT_CONSUMER_CONFIG = ImmutableConsumerConfiguration.builder() // + .topicUrl( + "http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12") + .trustStorePath("trustStorePath") // + .trustStorePasswordPath("trustStorePasswordPath") // + .keyStorePath("keyStorePath") // + .keyStorePasswordPath("keyStorePasswordPath") // + .enableDmaapCertAuth(true) // + .build(); + + private static final PublisherConfiguration CORRECT_PUBLISHER_CONFIG = // + ImmutablePublisherConfiguration.builder() // + .publishUrl("https://message-router.onap.svc.cluster.local:3907/publish/1") // + .logUrl("https://dmaap.example.com/feedlog/972").trustStorePath("trustStorePath") // + .trustStorePasswordPath("trustStorePasswordPath") // + .keyStorePath("keyStorePath") // + .keyStorePasswordPath("keyStorePasswordPath") // + .enableDmaapCertAuth(true) // + .changeIdentifier("PM_MEAS_FILES") // + .userName("user") // + .passWord("password") // + .build(); + + private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION = // + new ImmutableFtpesConfig.Builder() // + .keyCert("/config/dfc.jks") // + .keyPassword("secret") // + .trustedCa("config/ftp.jks") // + .trustedCaPassword("secret") // + .build(); + + private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = // + new ImmutableDmaapPublisherConfiguration.Builder() // + .dmaapTopicName("/publish/1") // + .dmaapUserPassword("password") // + .dmaapPortNumber(3907) // + .dmaapProtocol("https") // + .dmaapContentType("application/octet-stream") // + .dmaapHostName("message-router.onap.svc.cluster.local") // + .dmaapUserName("user") // + .trustStorePath("trustStorePath") // + .trustStorePasswordPath("trustStorePasswordPath") // + .keyStorePath("keyStorePath") // + .keyStorePasswordPath("keyStorePasswordPath") // + .enableDmaapCertAuth(true) // + .build(); + + private static EnvProperties properties() { + return ImmutableEnvProperties.builder() // + .consulHost("host") // + .consulPort(123) // + .cbsName("cbsName") // + .appName("appName") // + .build(); + } - private static AppConfig appConfigUnderTest; + private AppConfig appConfigUnderTest; + private CloudConfigurationProvider cloudConfigurationProvider = mock(CloudConfigurationProvider.class); + private final Map context = MappedDiagnosticContext.initializeTraceContext(); - private static String filePath = - Objects.requireNonNull(AppConfigTest.class.getClassLoader().getResource(DATAFILE_ENDPOINTS)).getFile(); @BeforeEach public void setUp() { appConfigUnderTest = spy(AppConfig.class); + appConfigUnderTest.setCloudConfigurationProvider(cloudConfigurationProvider); + appConfigUnderTest.systemEnvironment = new Properties(); } @Test - public void whenApplicationWasStarted_FilePathIsSet() { + public void whenTheConfigurationFits() throws IOException, DatafileTaskException { // When - appConfigUnderTest.setFilepath(filePath); + doReturn(getCorrectJson()).when(appConfigUnderTest).createInputStream(any()); + appConfigUnderTest.initialize(); // Then - verify(appConfigUnderTest, times(1)).setFilepath(anyString()); - verify(appConfigUnderTest, times(0)).loadConfigurationFromFile(); - Assertions.assertEquals(filePath, appConfigUnderTest.getFilepath()); + verify(appConfigUnderTest, times(1)).loadConfigurationFromFile(); + + ConsumerConfiguration consumerCfg = appConfigUnderTest.getDmaapConsumerConfiguration(); + Assertions.assertNotNull(consumerCfg); + assertThat(consumerCfg.toDmaap()).isEqualToComparingFieldByField(CORRECT_DMAAP_CONSUMER_CONFIG); + assertThat(consumerCfg).isEqualToComparingFieldByField(CORRECT_CONSUMER_CONFIG); + + PublisherConfiguration publisherCfg = appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER); + Assertions.assertNotNull(publisherCfg); + assertThat(publisherCfg).isEqualToComparingFieldByField(CORRECT_PUBLISHER_CONFIG); + assertThat(publisherCfg.toDmaap()).isEqualToComparingFieldByField(CORRECT_DMAAP_PUBLISHER_CONFIG); + + FtpesConfig ftpesConfig = appConfigUnderTest.getFtpesConfiguration(); + assertThat(ftpesConfig).isNotNull(); + assertThat(ftpesConfig).isEqualToComparingFieldByField(CORRECT_FTPES_CONFIGURATION); } @Test - public void whenTheConfigurationFits_GetFtpsAndDmaapObjectRepresentationConfiguration() throws IOException { - // Given - InputStream inputStream = - new ByteArrayInputStream((getJsonConfig(CORRECT_JSON).getBytes(StandardCharsets.UTF_8))); - + public void whenTheConfigurationFits_twoProducers() throws IOException, DatafileTaskException { // When - appConfigUnderTest.setFilepath(filePath); - doReturn(inputStream).when(appConfigUnderTest).createInputStream(any()); + doReturn(getCorrectJsonTwoProducers()).when(appConfigUnderTest).createInputStream(any()); appConfigUnderTest.loadConfigurationFromFile(); // Then - verify(appConfigUnderTest, times(1)).setFilepath(anyString()); verify(appConfigUnderTest, times(1)).loadConfigurationFromFile(); Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration()); - Assertions.assertNotNull(appConfigUnderTest.getDmaapPublisherConfiguration()); - Assertions.assertEquals(appConfigUnderTest.getDmaapPublisherConfiguration(), - appConfigUnderTest.getDmaapPublisherConfiguration()); - Assertions.assertEquals(appConfigUnderTest.getDmaapConsumerConfiguration(), - appConfigUnderTest.getDmaapConsumerConfiguration()); - Assertions.assertEquals(appConfigUnderTest.getFtpesConfiguration(), appConfigUnderTest.getFtpesConfiguration()); + Assertions.assertNotNull(appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER)); + Assertions.assertNotNull(appConfigUnderTest.getPublisherConfiguration("XX_FILES")); + Assertions.assertNotNull(appConfigUnderTest.getPublisherConfiguration("YY_FILES")); + + assertThat(appConfigUnderTest.getPublisherConfiguration("XX_FILES").publishUrl()) + .isEqualTo("feed01::publish_url"); + assertThat(appConfigUnderTest.getPublisherConfiguration("YY_FILES").publishUrl()) + .isEqualTo("feed01::publish_url"); } @Test - public void whenFileIsNotExist_ThrowIoException() { + public void whenFileIsNotExist_ThrowException() throws DatafileTaskException { // Given - filePath = "/temp.json"; - appConfigUnderTest.setFilepath(filePath); + appConfigUnderTest.setFilepath("/temp.json"); // When appConfigUnderTest.loadConfigurationFromFile(); // Then - verify(appConfigUnderTest, times(1)).setFilepath(anyString()); - verify(appConfigUnderTest, times(1)).loadConfigurationFromFile(); Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration()); - Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration()); - Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration()); + assertThatThrownBy(() -> appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER)) + .hasMessageContaining("No PublishingConfiguration loaded, changeIdentifier: PM_MEAS_FILES"); + Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration()); } @Test - public void whenFileIsExistsButJsonIsIncorrect() throws IOException { - // Given - InputStream inputStream = - new ByteArrayInputStream((getJsonConfig(INCORRECT_JSON).getBytes(StandardCharsets.UTF_8))); + public void whenFileIsExistsButJsonIsIncorrect() throws IOException, DatafileTaskException { // When - appConfigUnderTest.setFilepath(filePath); - doReturn(inputStream).when(appConfigUnderTest).createInputStream(any()); + doReturn(getIncorrectJson()).when(appConfigUnderTest).createInputStream(any()); appConfigUnderTest.loadConfigurationFromFile(); // Then - verify(appConfigUnderTest, times(1)).setFilepath(anyString()); verify(appConfigUnderTest, times(1)).loadConfigurationFromFile(); Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration()); - Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration()); + assertThatThrownBy(() -> appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER)) + .hasMessageContaining(CHANGE_IDENTIFIER); Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration()); - } - @Test - public void whenTheConfigurationFits_ButRootElementIsNotAJsonObject() throws IOException { - // Given - InputStream inputStream = - new ByteArrayInputStream((getJsonConfig(CORRECT_JSON).getBytes(StandardCharsets.UTF_8))); + public void whenTheConfigurationFits_ButRootElementIsNotAJsonObject() throws IOException, DatafileTaskException { + // When - appConfigUnderTest.setFilepath(filePath); - doReturn(inputStream).when(appConfigUnderTest).createInputStream(any()); + doReturn(getCorrectJson()).when(appConfigUnderTest).createInputStream(any()); JsonElement jsonElement = mock(JsonElement.class); when(jsonElement.isJsonObject()).thenReturn(false); doReturn(jsonElement).when(appConfigUnderTest).getJsonElement(any(JsonParser.class), any(InputStream.class)); appConfigUnderTest.loadConfigurationFromFile(); // Then - verify(appConfigUnderTest, times(1)).setFilepath(anyString()); verify(appConfigUnderTest, times(1)).loadConfigurationFromFile(); Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration()); - Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration()); + assertThatThrownBy(() -> appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER)) + .hasMessageContaining(CHANGE_IDENTIFIER); Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration()); } - private String getJsonConfig(boolean correct) { - JsonObject dmaapConsumerConfigData = new JsonObject(); - dmaapConsumerConfigData.addProperty("dmaapHostName", "localhost"); - dmaapConsumerConfigData.addProperty("dmaapPortNumber", 2222); - dmaapConsumerConfigData.addProperty("dmaapTopicName", "/events/unauthenticated.VES_NOTIFICATION_OUTPUT"); - dmaapConsumerConfigData.addProperty("dmaapProtocol", "http"); - dmaapConsumerConfigData.addProperty("dmaapUserName", "admin"); - dmaapConsumerConfigData.addProperty("dmaapUserPassword", "admin"); - dmaapConsumerConfigData.addProperty("dmaapContentType", "application/json"); - dmaapConsumerConfigData.addProperty("consumerId", "C12"); - dmaapConsumerConfigData.addProperty("consumerGroup", "OpenDcae-c12"); - dmaapConsumerConfigData.addProperty("timeoutMs", -1); - dmaapConsumerConfigData.addProperty("messageLimit", 1); - - JsonObject dmaapProducerConfigData = new JsonObject(); - dmaapProducerConfigData.addProperty("dmaapHostName", "localhost"); - dmaapProducerConfigData.addProperty("dmaapPortNumber", 3907); - dmaapProducerConfigData.addProperty("dmaapTopicName", "publish"); - dmaapProducerConfigData.addProperty("dmaapProtocol", "https"); - if (correct) { - dmaapProducerConfigData.addProperty("dmaapUserName", "dradmin"); - dmaapProducerConfigData.addProperty("dmaapUserPassword", "dradmin"); - dmaapProducerConfigData.addProperty("dmaapContentType", "application/octet-stream"); - } - - JsonObject dmaapConfigs = new JsonObject(); - dmaapConfigs.add("dmaapConsumerConfiguration", dmaapConsumerConfigData); - dmaapConfigs.add("dmaapProducerConfiguration", dmaapProducerConfigData); - - JsonObject ftpesConfigData = new JsonObject(); - ftpesConfigData.addProperty("keyCert", "config/dfc.jks"); - ftpesConfigData.addProperty("keyPassword", "secret"); - ftpesConfigData.addProperty("trustedCa", "config/ftp.jks"); - ftpesConfigData.addProperty("trustedCaPassword", "secret"); - - JsonObject security = new JsonObject(); - security.addProperty("trustStorePath", "trustStorePath"); - security.addProperty("trustStorePasswordPath", "trustStorePasswordPath"); - security.addProperty("keyStorePath", "keyStorePath"); - security.addProperty("keyStorePasswordPath", "keyStorePasswordPath"); - security.addProperty("enableDmaapCertAuth", "enableDmaapCertAuth"); - - JsonObject ftpesConfiguration = new JsonObject(); - ftpesConfiguration.add("ftpesConfiguration", ftpesConfigData); - - JsonObject configs = new JsonObject(); - configs.add("dmaap", dmaapConfigs); - configs.add("ftp", ftpesConfiguration); - configs.add("security", security); - - JsonObject completeJson = new JsonObject(); - completeJson.add("configs", configs); - - return completeJson.toString(); + @Test + public void whenPeriodicConfigRefreshNoEnvironmentVariables() { + ListAppender logAppender = LoggingUtils.getLogListAppender(AppConfig.class); + + Flux task = appConfigUnderTest.createRefreshConfigurationTask(1L, context); + + StepVerifier // + .create(task) // + .expectSubscription() // + .expectNextCount(0) // + .verifyComplete(); + + assertTrue(logAppender.list.toString().contains("$CONSUL_HOST environment has not been defined")); + } + + @Test + public void whenPeriodicConfigRefreshNoConsul() { + ListAppender logAppender = LoggingUtils.getLogListAppender(AppConfig.class); + + doReturn(Mono.just(properties())).when(appConfigUnderTest).readEnvironmentVariables(any(), any()); + Mono err = Mono.error(new IOException()); + doReturn(err).when(cloudConfigurationProvider).callForServiceConfigurationReactive(any()); + + Flux task = appConfigUnderTest.createRefreshConfigurationTask(1L, context); + + StepVerifier // + .create(task) // + .expectSubscription() // + .expectNextCount(0) // + .verifyComplete(); + + assertTrue(logAppender.list.toString() + .contains("Could not refresh application configuration java.io.IOException")); + } + + @Test + public void whenPeriodicConfigRefreshSuccess() throws JsonIOException, JsonSyntaxException, IOException { + doReturn(Mono.just(properties())).when(appConfigUnderTest).readEnvironmentVariables(any(), any()); + + Mono json = Mono.just(getJsonRootObject()); + + doReturn(json, json).when(cloudConfigurationProvider).callForServiceConfigurationReactive(any()); + + Flux task = appConfigUnderTest.createRefreshConfigurationTask(1L, context); + + StepVerifier // + .create(task) // + .expectSubscription() // + .expectNext(appConfigUnderTest) // + .verifyComplete(); + + Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration()); + } + + @Test + public void whenPeriodicConfigRefreshSuccess2() throws JsonIOException, JsonSyntaxException, IOException { + doReturn(Mono.just(properties())).when(appConfigUnderTest).readEnvironmentVariables(any(), any()); + + Mono json = Mono.just(getJsonRootObject()); + Mono err = Mono.error(new IOException()); // no config entry created by the + // dmaap plugin + + doReturn(json, err).when(cloudConfigurationProvider).callForServiceConfigurationReactive(any()); + + Flux task = appConfigUnderTest.createRefreshConfigurationTask(1L, context); + + StepVerifier // + .create(task) // + .expectSubscription() // + .expectNext(appConfigUnderTest) // + .verifyComplete(); + + Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration()); + } + + private JsonObject getJsonRootObject() throws JsonIOException, JsonSyntaxException, IOException { + JsonObject rootObject = (new JsonParser()).parse(new InputStreamReader(getCorrectJson())).getAsJsonObject(); + return rootObject; + } + + private static InputStream getCorrectJson() throws IOException { + URL url = CloudConfigParser.class.getClassLoader().getResource("datafile_endpoints_test.json"); + String string = Resources.toString(url, Charsets.UTF_8); + return new ByteArrayInputStream((string.getBytes(StandardCharsets.UTF_8))); + } + + private static InputStream getCorrectJsonTwoProducers() throws IOException { + URL url = CloudConfigParser.class.getClassLoader().getResource("datafile_endpoints_test_2producers.json"); + String string = Resources.toString(url, Charsets.UTF_8); + return new ByteArrayInputStream((string.getBytes(StandardCharsets.UTF_8))); + } + + private static InputStream getIncorrectJson() { + String string = "{" + // + " \"configs\": {" + // + " \"dmaap\": {"; // + return new ByteArrayInputStream((string.getBytes(StandardCharsets.UTF_8))); } } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java deleted file mode 100644 index 07233d95..00000000 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java +++ /dev/null @@ -1,133 +0,0 @@ -/*- - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.configuration; - -import static org.assertj.core.api.Assertions.assertThat; -import com.google.gson.JsonObject; -import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; - - -class CloudConfigParserTest { - private static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = // - new ImmutableDmaapConsumerConfiguration.Builder() // - .timeoutMs(-1) // - .dmaapHostName("message-router.onap.svc.cluster.local") // - .dmaapUserName("admin") // - .dmaapUserPassword("admin") // - .dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT") // - .dmaapPortNumber(2222) // - .dmaapContentType("application/json") // - .messageLimit(-1) // - .dmaapProtocol("http") // - .consumerId("C12") // - .consumerGroup("OpenDCAE-c12") // - .trustStorePath("trustStorePath") // - .trustStorePasswordPath("trustStorePasswordPath") // - .keyStorePath("keyStorePath") // - .keyStorePasswordPath("keyStorePasswordPath") // - .enableDmaapCertAuth(true) // - .build(); - - private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = // - new ImmutableDmaapPublisherConfiguration.Builder() // - .dmaapTopicName("publish") // - .dmaapUserPassword("dradmin") // - .dmaapPortNumber(3907) // - .dmaapProtocol("https") // - .dmaapContentType("application/json") // - .dmaapHostName("message-router.onap.svc.cluster.local") // - .dmaapUserName("dradmin") // - .trustStorePath("trustStorePath") // - .trustStorePasswordPath("trustStorePasswordPath") // - .keyStorePath("keyStorePath") // - .keyStorePasswordPath("keyStorePasswordPath") // - .enableDmaapCertAuth(true) // - .build(); - - private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION = // - new ImmutableFtpesConfig.Builder() // - .keyCert("/config/dfc.jks") // - .keyPassword("secret") // - .trustedCa("config/ftp.jks") // - .trustedCaPassword("secret") // - .build(); - - private CloudConfigParser cloudConfigParser = new CloudConfigParser(getCloudConfigJsonObject()); - - @Test - public void shouldCreateDmaapConsumerConfigurationCorrectly() { - DmaapConsumerConfiguration dmaapConsumerConfig = cloudConfigParser.getDmaapConsumerConfig(); - - assertThat(dmaapConsumerConfig).isNotNull(); - assertThat(dmaapConsumerConfig).isEqualToComparingFieldByField(CORRECT_DMAAP_CONSUMER_CONFIG); - } - - @Test - public void shouldCreateDmaapPublisherConfigurationCorrectly() { - DmaapPublisherConfiguration dmaapPublisherConfig = cloudConfigParser.getDmaapPublisherConfig(); - - assertThat(dmaapPublisherConfig).isNotNull(); - assertThat(dmaapPublisherConfig).isEqualToComparingFieldByField(CORRECT_DMAAP_PUBLISHER_CONFIG); - } - - @Test - public void shouldCreateFtpesConfigurationCorrectly() { - FtpesConfig ftpesConfig = cloudConfigParser.getFtpesConfig(); - - assertThat(ftpesConfig).isNotNull(); - assertThat(ftpesConfig).isEqualToComparingFieldByField(CORRECT_FTPES_CONFIGURATION); - } - - public JsonObject getCloudConfigJsonObject() { - JsonObject config = new JsonObject(); - config.addProperty("dmaap.dmaapConsumerConfiguration.timeoutMs", -1); - config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapHostName", "message-router.onap.svc.cluster.local"); - config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapUserName", "admin"); - config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapUserPassword", "admin"); - config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapTopicName", - "/events/unauthenticated.VES_NOTIFICATION_OUTPUT"); - config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapPortNumber", 2222); - config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapContentType", "application/json"); - config.addProperty("dmaap.dmaapConsumerConfiguration.messageLimit", -1); - config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapProtocol", "http"); - config.addProperty("dmaap.dmaapConsumerConfiguration.consumerId", "C12"); - config.addProperty("dmaap.dmaapConsumerConfiguration.consumerGroup", "OpenDCAE-c12"); - config.addProperty("dmaap.dmaapProducerConfiguration.dmaapTopicName", "publish"); - config.addProperty("dmaap.dmaapProducerConfiguration.dmaapProtocol", "https"); - config.addProperty("dmaap.dmaapProducerConfiguration.dmaapContentType", "application/json"); - config.addProperty("dmaap.dmaapProducerConfiguration.dmaapHostName", "message-router.onap.svc.cluster.local"); - config.addProperty("dmaap.dmaapProducerConfiguration.dmaapPortNumber", 3907); - config.addProperty("dmaap.dmaapProducerConfiguration.dmaapUserName", "dradmin"); - config.addProperty("dmaap.dmaapProducerConfiguration.dmaapUserPassword", "dradmin"); - config.addProperty("dmaap.ftpesConfig.keyCert", "/config/dfc.jks"); - config.addProperty("dmaap.ftpesConfig.keyPassword", "secret"); - config.addProperty("dmaap.ftpesConfig.trustedCa", "config/ftp.jks"); - config.addProperty("dmaap.ftpesConfig.trustedCaPassword", "secret"); - - config.addProperty("dmaap.security.trustStorePath", "trustStorePath"); - config.addProperty("dmaap.security.trustStorePasswordPath", "trustStorePasswordPath"); - config.addProperty("dmaap.security.keyStorePath", "keyStorePath"); - config.addProperty("dmaap.security.keyStorePasswordPath", "keyStorePasswordPath"); - config.addProperty("dmaap.security.enableDmaapCertAuth", "true"); - - return config; - } -} diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfigTest.java index 6e2140b4..eba88c33 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfigTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfigTest.java @@ -25,6 +25,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -35,26 +36,40 @@ import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ScheduledFuture; + +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.scheduling.TaskScheduler; + import reactor.test.StepVerifier; public class SchedulerConfigTest { + private final AppConfig appConfigurationMock = mock(AppConfig.class); + private final TaskScheduler taskSchedulerMock = mock(TaskScheduler.class); + private final ScheduledTasks scheduledTasksMock = mock(ScheduledTasks.class); + private final SchedulerConfig schedulerUnderTest = + spy(new SchedulerConfig(taskSchedulerMock, scheduledTasksMock, appConfigurationMock)); + + @BeforeEach + public void setUp() { + doNothing().when(appConfigurationMock).stop(); + doNothing().when(appConfigurationMock).initialize(); + } + @Test public void getResponseFromCancellationOfTasks_success() { + List> scheduledFutureList = new ArrayList<>(); ScheduledFuture scheduledFutureMock = mock(ScheduledFuture.class); scheduledFutureList.add(scheduledFutureMock); SchedulerConfig.setScheduledFutureList(scheduledFutureList); - SchedulerConfig schedulerUnderTest = new SchedulerConfig(null, null, null); - String msg = "Datafile Service has already been stopped!"; StepVerifier.create(schedulerUnderTest.getResponseFromCancellationOfTasks()) .expectNext(new ResponseEntity(msg, HttpStatus.CREATED)) // @@ -68,24 +83,17 @@ public class SchedulerConfigTest { @Test public void tryToStartTaskWhenNotStarted_success() { - TaskScheduler taskSchedulerMock = mock(TaskScheduler.class); - ScheduledTasks scheduledTasksMock = mock(ScheduledTasks.class); - CloudConfiguration cloudConfigurationMock = mock(CloudConfiguration.class); List> scheduledFutureList = new ArrayList<>(); SchedulerConfig.setScheduledFutureList(scheduledFutureList); SchedulerConfig schedulerUnderTestSpy = - spy(new SchedulerConfig(taskSchedulerMock, scheduledTasksMock, cloudConfigurationMock)); + spy(new SchedulerConfig(taskSchedulerMock, scheduledTasksMock, appConfigurationMock)); boolean actualResult = schedulerUnderTestSpy.tryToStartTask(); assertTrue(actualResult); - ArgumentCaptor runTaskRunnableCaptor = ArgumentCaptor.forClass(Runnable.class); - verify(taskSchedulerMock).scheduleAtFixedRate(runTaskRunnableCaptor.capture(), any(Instant.class), - eq(Duration.ofMinutes(5))); - ArgumentCaptor scheduleMainDatafileEventTaskCaptor = ArgumentCaptor.forClass(Runnable.class); verify(taskSchedulerMock).scheduleWithFixedDelay(scheduleMainDatafileEventTaskCaptor.capture(), eq(Duration.ofSeconds(15))); @@ -100,22 +108,22 @@ public class SchedulerConfigTest { verify(scheduledTasksMock).executeDatafileMainTask(); verifyNoMoreInteractions(scheduledTasksMock); - runTaskRunnableCaptor.getValue().run(); - verify(cloudConfigurationMock).runTask(); - verifyNoMoreInteractions(cloudConfigurationMock); + verify(appConfigurationMock).initialize(); + verifyNoMoreInteractions(appConfigurationMock); - assertEquals(3, scheduledFutureList.size()); + assertEquals(2, scheduledFutureList.size()); } @Test public void tryToStartTaskWhenAlreadyStarted_shouldReturnFalse() { + doNothing().when(appConfigurationMock).loadConfigurationFromFile(); List> scheduledFutureList = new ArrayList<>(); ScheduledFuture scheduledFutureMock = mock(ScheduledFuture.class); scheduledFutureList.add(scheduledFutureMock); SchedulerConfig.setScheduledFutureList(scheduledFutureList); - SchedulerConfig schedulerUnderTest = new SchedulerConfig(null, null, null); + SchedulerConfig schedulerUnderTest = new SchedulerConfig(null, null, appConfigurationMock); boolean actualResult = schedulerUnderTest.tryToStartTask(); diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java deleted file mode 100644 index 7f6b8c51..00000000 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java +++ /dev/null @@ -1,63 +0,0 @@ -/*- - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.integration; - -import static org.mockito.Mockito.atLeast; -import static org.mockito.Mockito.verify; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.ComponentScan; -import org.springframework.context.annotation.Configuration; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit.jupiter.SpringExtension; -import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; - -/** - * Integration test for the ScheduledXmlContext. - * - * @author Przemysław Wąsala on 3/27/18 - * @author Henrik Andersson - */ - -@Configuration -@ComponentScan -@ExtendWith({ SpringExtension.class }) -@ContextConfiguration(locations = { "classpath:scheduled-context.xml" }) -class ScheduledXmlContextITest extends AbstractTestNGSpringContextTests { - - private static final int WAIT_FOR_SCHEDULING = 1; - - @Autowired - private ScheduledTasks scheduledTask; - - @Test - void testScheduling() { - final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); - executorService.scheduleWithFixedDelay(this::verifyDmaapConsumerTask, 0, WAIT_FOR_SCHEDULING, TimeUnit.SECONDS); - } - - private void verifyDmaapConsumerTask() { - verify(scheduledTask, atLeast(1)).executeDatafileMainTask(); - } -} diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java deleted file mode 100644 index 83c92ef4..00000000 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java +++ /dev/null @@ -1,72 +0,0 @@ -/*- - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.model; - -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.HashMap; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -public class FilePublishInformationTest { - private static final String PRODUCT_NAME = "NrRadio"; - private static final String VENDOR_NAME = "Ericsson"; - private static final String LAST_EPOCH_MICROSEC = "8745745764578"; - private static final String SOURCE_NAME = "oteNB5309"; - private static final String START_EPOCH_MICROSEC = "8745745764578"; - private static final String TIME_ZONE_OFFSET = "UTC+05:00"; - private static final String NAME = "A20161224.1030-1045.bin.gz"; - private static final String LOCATION = "ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1145.bin.gz"; - private static final Path INTERNAL_LOCATION = Paths.get("target/A20161224.1030-1045.bin.gz"); - private static final String COMPRESSION = "gzip"; - private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec"; - private static final String FILE_FORMAT_VERSION = "V10"; - - @Test - public void filePublishInformationBuilder_shouldBuildAnObject() { - FilePublishInformation filePublishInformation = ImmutableFilePublishInformation.builder() // - .productName(PRODUCT_NAME) // - .vendorName(VENDOR_NAME) // - .lastEpochMicrosec(LAST_EPOCH_MICROSEC) // - .sourceName(SOURCE_NAME) // - .startEpochMicrosec(START_EPOCH_MICROSEC) // - .timeZoneOffset(TIME_ZONE_OFFSET) // - .name(NAME) // - .location(LOCATION) // - .internalLocation(INTERNAL_LOCATION) // - .compression(COMPRESSION) // - .fileFormatType(FILE_FORMAT_TYPE) // - .fileFormatVersion(FILE_FORMAT_VERSION) // - .context(new HashMap()) // - .build(); - - Assertions.assertNotNull(filePublishInformation); - Assertions.assertEquals(PRODUCT_NAME, filePublishInformation.getProductName()); - Assertions.assertEquals(VENDOR_NAME, filePublishInformation.getVendorName()); - Assertions.assertEquals(LAST_EPOCH_MICROSEC, filePublishInformation.getLastEpochMicrosec()); - Assertions.assertEquals(SOURCE_NAME, filePublishInformation.getSourceName()); - Assertions.assertEquals(START_EPOCH_MICROSEC, filePublishInformation.getStartEpochMicrosec()); - Assertions.assertEquals(TIME_ZONE_OFFSET, filePublishInformation.getTimeZoneOffset()); - Assertions.assertEquals(NAME, filePublishInformation.getName()); - Assertions.assertEquals(LOCATION, filePublishInformation.getLocation()); - Assertions.assertEquals(INTERNAL_LOCATION, filePublishInformation.getInternalLocation()); - Assertions.assertEquals(COMPRESSION, filePublishInformation.getCompression()); - Assertions.assertEquals(FILE_FORMAT_TYPE, filePublishInformation.getFileFormatType()); - Assertions.assertEquals(FILE_FORMAT_VERSION, filePublishInformation.getFileFormatVersion()); - } -} diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java index becfba31..8c7938bf 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java @@ -22,9 +22,12 @@ import static org.mockito.Mockito.spy; import com.google.gson.JsonElement; import com.google.gson.JsonParser; + +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.Optional; + import org.junit.jupiter.api.Test; import org.mockito.Mockito; import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; @@ -36,6 +39,7 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData; import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField; + import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -65,7 +69,7 @@ class JsonMessageParserTest { private static final String NOTIFICATION_FIELDS_VERSION = "1.0"; @Test - void whenPassingCorrectJson_oneFileReadyMessage() { + void whenPassingCorrectJson_oneFileReadyMessage() throws URISyntaxException { AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() // .name(PM_FILE_NAME) // .location(LOCATION) // diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java index e21bbd7b..a71521cd 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java @@ -26,7 +26,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import java.net.URI; + import java.nio.charset.StandardCharsets; import java.security.KeyManagementException; import java.security.KeyStoreException; @@ -35,7 +35,9 @@ import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Future; + import javax.net.ssl.SSLContext; + import org.apache.commons.codec.binary.Base64; import org.apache.http.Header; import org.apache.http.HttpResponse; @@ -192,10 +194,4 @@ class DmaapProducerHttpClientTest { Header[] authorizationHeaders = request.getHeaders("Authorization"); assertEquals(base64Creds, authorizationHeaders[0].getValue()); } - - @Test - public void getBaseUri_success() { - URI uri = producerClientUnderTestSpy.getBaseUri().build(); - assertEquals(HTTPS_SCHEME + "://" + HOST + ":" + PORT, uri.toString()); - } } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java index 5e737253..574ad18e 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java @@ -77,6 +77,7 @@ public class DMaaPMessageConsumerTest { private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec"; private static final String FILE_FORMAT_VERSION = "V10"; private static List listOfFilePublishInformation = new ArrayList(); + private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES"; private DMaaPConsumerReactiveHttpClient httpClientMock; @@ -173,6 +174,7 @@ public class DMaaPMessageConsumerTest { .compression(GZIP_COMPRESSION) // .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) // .fileFormatVersion(FILE_FORMAT_VERSION) // + .changeIdentifier(CHANGE_IDENTIFIER) // .context(new HashMap()) // .build(); listOfFilePublishInformation.add(filePublishInformation); diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java index 8f768d38..463c62c9 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java @@ -47,14 +47,12 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation; import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.springframework.http.HttpStatus; -import org.springframework.web.util.DefaultUriBuilderFactory; -import org.springframework.web.util.UriBuilder; import reactor.test.StepVerifier; @@ -65,6 +63,7 @@ import reactor.test.StepVerifier; * @author Henrik Andersson */ class DataRouterPublisherTest { + private static final String PRODUCT_NAME = "NrRadio"; private static final String VENDOR_NAME = "Ericsson"; private static final String LAST_EPOCH_MICROSEC = "8745745764578"; @@ -73,6 +72,7 @@ class DataRouterPublisherTest { private static final String TIME_ZONE_OFFSET = "UTC+05:00"; private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; private static final String FTPES_ADDRESS = "ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME; + private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES"; private static final String COMPRESSION = "gzip"; private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec"; @@ -90,15 +90,17 @@ class DataRouterPublisherTest { private static FilePublishInformation filePublishInformation; private static DmaapProducerHttpClient httpClientMock; private static AppConfig appConfig; - private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class); + private static PublisherConfiguration publisherConfigurationMock = mock(PublisherConfiguration.class); private static Map context = new HashMap<>(); private static DataRouterPublisher publisherTaskUnderTestSpy; + // "https://54.45.333.2:1234/publish/1"; + private static final String PUBLISH_URL = + HTTPS_SCHEME + "://" + HOST + ":" + PORT + "/" + PUBLISH_TOPIC + "/" + FEED_ID; + @BeforeAll public static void setUp() { - when(publisherConfigurationMock.dmaapHostName()).thenReturn(HOST); - when(publisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME); - when(publisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT); + when(publisherConfigurationMock.publishUrl()).thenReturn(PUBLISH_URL); filePublishInformation = ImmutableFilePublishInformation.builder() // .productName(PRODUCT_NAME) // @@ -114,6 +116,7 @@ class DataRouterPublisherTest { .fileFormatType(FILE_FORMAT_TYPE) // .fileFormatVersion(FILE_FORMAT_VERSION) // .context(context) // + .changeIdentifier(CHANGE_IDENTIFIER) // .build(); // appConfig = mock(AppConfig.class); publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig)); @@ -128,7 +131,6 @@ class DataRouterPublisherTest { .verifyComplete(); ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(HttpUriRequest.class); - verify(httpClientMock).getBaseUri(); verify(httpClientMock).addUserCredentialsToHead(any(HttpUriRequest.class)); verify(httpClientMock).getDmaapProducerResponseWithRedirect(requestCaptor.capture(), any()); verifyNoMoreInteractions(httpClientMock); @@ -138,6 +140,7 @@ class DataRouterPublisherTest { assertEquals(HTTPS_SCHEME, actualUri.getScheme()); assertEquals(HOST, actualUri.getHost()); assertEquals(PORT, actualUri.getPort()); + Path actualPath = Paths.get(actualUri.getPath()); assertTrue(PUBLISH_TOPIC.equals(actualPath.getName(0).toString())); assertTrue(FEED_ID.equals(actualPath.getName(1).toString())); @@ -160,7 +163,8 @@ class DataRouterPublisherTest { assertEquals(FILE_FORMAT_TYPE, metaHash.get("fileFormatType")); assertEquals(FILE_FORMAT_VERSION, metaHash.get("fileFormatVersion")); - // Note that the following line checks the number of properties that are sent to the data router. + // Note that the following line checks the number of properties that are sent to the data + // router. // This should be 10 unless the API is updated (which is the fields checked above) assertEquals(10, metaHash.size()); } @@ -185,7 +189,6 @@ class DataRouterPublisherTest { .expectNext(filePublishInformation) // .verifyComplete(); - verify(httpClientMock, times(2)).getBaseUri(); verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class)); verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any()); verifyNoMoreInteractions(httpClientMock); @@ -201,7 +204,6 @@ class DataRouterPublisherTest { .expectErrorMessage("Retries exhausted: 1/1") // .verify(); - verify(httpClientMock, times(2)).getBaseUri(); verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class)); verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any()); verifyNoMoreInteractions(httpClientMock); @@ -211,12 +213,9 @@ class DataRouterPublisherTest { final void prepareMocksForTests(Exception exception, Integer firstResponse, Integer... nextHttpResponses) throws Exception { httpClientMock = mock(DmaapProducerHttpClient.class); - when(appConfig.getDmaapPublisherConfiguration()).thenReturn(publisherConfigurationMock); - doReturn(publisherConfigurationMock).when(publisherTaskUnderTestSpy).resolveConfiguration(); - doReturn(httpClientMock).when(publisherTaskUnderTestSpy).resolveClient(); - - UriBuilder uriBuilder = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT); - when(httpClientMock.getBaseUri()).thenReturn(uriBuilder); + when(appConfig.getPublisherConfiguration(CHANGE_IDENTIFIER)).thenReturn(publisherConfigurationMock); + doReturn(publisherConfigurationMock).when(publisherTaskUnderTestSpy).resolveConfiguration(CHANGE_IDENTIFIER); + doReturn(httpClientMock).when(publisherTaskUnderTestSpy).resolveClient(CHANGE_IDENTIFIER); HttpResponse httpResponseMock = mock(HttpResponse.class); if (exception == null) { diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java index cad3486d..299a0238 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java @@ -83,6 +83,7 @@ public class FileCollectorTest { private static final String FTP_KEY_PASSWORD = "ftpKeyPassword"; private static final String TRUSTED_CA_PATH = "trustedCAPath"; private static final String TRUSTED_CA_PASSWORD = "trustedCAPassword"; + private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES"; private static AppConfig appConfigMock = mock(AppConfig.class); private static FtpesConfig ftpesConfigMock = mock(FtpesConfig.class); @@ -132,7 +133,8 @@ public class FileCollectorTest { .compression(GZIP_COMPRESSION) // .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) // .fileFormatVersion(FILE_FORMAT_VERSION) // - .context(new HashMap()) + .context(new HashMap()) // + .changeIdentifier(CHANGE_IDENTIFIER) // .build(); } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java index 83643637..44755814 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java @@ -34,10 +34,9 @@ import static org.mockito.Mockito.when; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.net.URI; -import java.nio.file.Path; -import java.nio.file.Paths; import java.util.HashMap; import java.util.Map; + import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; import org.apache.http.StatusLine; @@ -47,55 +46,47 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.service.HttpUtils; import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.springframework.web.util.DefaultUriBuilderFactory; -import org.springframework.web.util.UriBuilder; public class PublishedCheckerTest { + private static final String PUBLISH_URL = "https://54.45.33.2:1234/"; private static final String EMPTY_CONTENT = "[]"; - private static final String FEEDLOG_TOPIC = "feedlog"; - private static final String FEED_ID = "1"; - private static final String HTTPS_SCHEME = "https"; - private static final String HOST = "54.45.33.2"; - private static final int PORT = 1234; private static final String SOURCE_NAME = "oteNB5309"; private static final String FILE_NAME = "A20161224.1030-1045.bin.gz"; private static final String LOCAL_FILE_NAME = SOURCE_NAME + "_" + FILE_NAME; + private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES"; + private static final String LOG_URI = "https://localhost:3907/feedlog/1"; private static final Map CONTEXT_MAP = new HashMap<>(); - private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class); + private static PublisherConfiguration publisherConfigurationMock = mock(PublisherConfiguration.class); private static AppConfig appConfigMock; private DmaapProducerHttpClient httpClientMock = mock(DmaapProducerHttpClient.class); private PublishedChecker publishedCheckerUnderTestSpy; - /** - * Sets up data for the tests. - */ + @BeforeAll - public static void setUp() { - when(publisherConfigurationMock.dmaapHostName()).thenReturn(HOST); - when(publisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME); - when(publisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT); + public static void setUp() throws DatafileTaskException { + when(publisherConfigurationMock.publishUrl()).thenReturn(PUBLISH_URL); appConfigMock = mock(AppConfig.class); - when(appConfigMock.getDmaapPublisherConfiguration()).thenReturn(publisherConfigurationMock); + when(appConfigMock.getPublisherConfiguration(CHANGE_IDENTIFIER)).thenReturn(publisherConfigurationMock); } @Test public void executeWhenNotPublished_returnsFalse() throws Exception { prepareMocksForTests(HttpUtils.SC_OK, EMPTY_CONTENT, null); - boolean isPublished = publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CONTEXT_MAP); + boolean isPublished = + publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CHANGE_IDENTIFIER, CONTEXT_MAP); assertFalse(isPublished); ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(HttpUriRequest.class); - verify(httpClientMock).getBaseUri(); verify(httpClientMock).addUserCredentialsToHead(any(HttpUriRequest.class)); verify(httpClientMock).getDmaapProducerResponseWithCustomTimeout(requestCaptor.capture(), any(), any()); verifyNoMoreInteractions(httpClientMock); @@ -103,22 +94,17 @@ public class PublishedCheckerTest { HttpUriRequest getRequest = requestCaptor.getValue(); assertTrue(getRequest instanceof HttpGet); URI actualUri = getRequest.getURI(); - assertEquals(HTTPS_SCHEME, actualUri.getScheme()); - assertEquals(HOST, actualUri.getHost()); - assertEquals(PORT, actualUri.getPort()); - Path actualPath = Paths.get(actualUri.getPath()); - assertTrue(FEEDLOG_TOPIC.equals(actualPath.getName(0).toString())); - assertTrue(FEED_ID.equals(actualPath.getName(1).toString())); - String actualQuery = actualUri.getQuery(); - assertTrue(actualQuery.contains("type=pub")); - assertTrue(actualQuery.contains("filename=" + LOCAL_FILE_NAME)); + // https://localhost:3907/feedlog/1?type=pub&filename=oteNB5309_A20161224.1030-1045.bin.gz + String expUri = LOG_URI + "?type=pub&filename=" + LOCAL_FILE_NAME; + assertEquals(expUri, actualUri.toString()); } @Test public void executeWhenDataRouterReturnsNok_returnsFalse() throws Exception { prepareMocksForTests(HttpUtils.SC_BAD_REQUEST, EMPTY_CONTENT, null); - boolean isPublished = publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CONTEXT_MAP); + boolean isPublished = + publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CHANGE_IDENTIFIER, CONTEXT_MAP); assertFalse(isPublished); } @@ -127,7 +113,8 @@ public class PublishedCheckerTest { public void executeWhenPublished_returnsTrue() throws Exception { prepareMocksForTests(HttpUtils.SC_OK, "[" + LOCAL_FILE_NAME + "]", null); - boolean isPublished = publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CONTEXT_MAP); + boolean isPublished = + publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CHANGE_IDENTIFIER, CONTEXT_MAP); assertTrue(isPublished); } @@ -136,7 +123,8 @@ public class PublishedCheckerTest { public void executeWhenErrorInDataRouter_returnsFalse() throws Exception { prepareMocksForTests(HttpUtils.SC_OK, EMPTY_CONTENT, new DatafileTaskException("")); - boolean isPublished = publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CONTEXT_MAP); + boolean isPublished = + publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CHANGE_IDENTIFIER, CONTEXT_MAP); assertFalse(isPublished); } @@ -144,11 +132,9 @@ public class PublishedCheckerTest { final void prepareMocksForTests(int responseCode, String content, Exception exception) throws Exception { publishedCheckerUnderTestSpy = spy(new PublishedChecker(appConfigMock)); - doReturn(publisherConfigurationMock).when(publishedCheckerUnderTestSpy).resolveConfiguration(); - doReturn(httpClientMock).when(publishedCheckerUnderTestSpy).resolveClient(); - - UriBuilder uriBuilder = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT); - when(httpClientMock.getBaseUri()).thenReturn(uriBuilder); + doReturn(publisherConfigurationMock).when(publishedCheckerUnderTestSpy).resolveConfiguration(CHANGE_IDENTIFIER); + doReturn(LOG_URI).when(publisherConfigurationMock).logUrl(); + doReturn(httpClientMock).when(publishedCheckerUnderTestSpy).resolveClient(publisherConfigurationMock); HttpResponse httpResponseMock = mock(HttpResponse.class); if (exception == null) { diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java index 0d5a4231..a1021868 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java @@ -42,6 +42,11 @@ import java.util.Map; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration; +import org.onap.dcaegen2.collectors.datafile.configuration.ImmutableConsumerConfiguration; +import org.onap.dcaegen2.collectors.datafile.configuration.ImmutablePublisherConfiguration; +import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; @@ -51,8 +56,6 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformati import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage; import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData; import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -61,6 +64,7 @@ import reactor.test.StepVerifier; public class ScheduledTasksTest { private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; + private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES"; private AppConfig appConfig = mock(AppConfig.class); private ScheduledTasks testedObject = spy(new ScheduledTasks(appConfig)); @@ -72,23 +76,33 @@ public class ScheduledTasksTest { private DataRouterPublisher dataRouterMock; private Map contextMap = new HashMap(); + private final String publishUrl = "https://54.45.33.2:1234/unauthenticated.VES_NOTIFICATION_OUTPUT"; + @BeforeEach - private void setUp() { - DmaapPublisherConfiguration dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() // - .dmaapContentType("application/json") // - .dmaapHostName("54.45.33.2") // - .dmaapPortNumber(1234) // - .dmaapProtocol("https") // - .dmaapUserName("DFC") // - .dmaapUserPassword("DFC") // - .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") // + private void setUp() throws DatafileTaskException { + final PublisherConfiguration dmaapPublisherConfiguration = ImmutablePublisherConfiguration.builder() // + .publishUrl(publishUrl) // + .logUrl("") // + .userName("userName") // + .passWord("passWord") // .trustStorePath("trustStorePath") // .trustStorePasswordPath("trustStorePasswordPath") // .keyStorePath("keyStorePath") // .keyStorePasswordPath("keyStorePasswordPath") // .enableDmaapCertAuth(true) // + .changeIdentifier(CHANGE_IDENTIFIER) // .build(); // - doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration(); + final ConsumerConfiguration dmaapConsumerConfiguration = ImmutableConsumerConfiguration.builder() // + .topicUrl("topicUrl").trustStorePath("trustStorePath") // + .trustStorePasswordPath("trustStorePasswordPath") // + .keyStorePath("keyStorePath") // + .keyStorePasswordPath("keyStorePasswordPath") // + .enableDmaapCertAuth(true) // + .build(); + + doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER); + doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration(); + doReturn(true).when(appConfig).isFeedConfigured(CHANGE_IDENTIFIER); consumerMock = mock(DMaaPMessageConsumer.class); publishedCheckerMock = mock(PublishedChecker.class); @@ -109,7 +123,7 @@ public class ScheduledTasksTest { .sourceName("") // .startEpochMicrosec("") // .timeZoneOffset("") // - .changeIdentifier("") // + .changeIdentifier(CHANGE_IDENTIFIER) // .changeType("") // .build(); } @@ -164,11 +178,12 @@ public class ScheduledTasksTest { .compression("") // .fileFormatType("") // .fileFormatVersion("") // + .changeIdentifier(CHANGE_IDENTIFIER) // .context(new HashMap()).build(); } @Test - public void notingToConsume() { + public void notingToConsume() throws DatafileTaskException { doReturn(consumerMock).when(testedObject).createConsumerTask(); doReturn(Flux.empty()).when(consumerMock).getMessageRouterResponse(); @@ -180,7 +195,7 @@ public class ScheduledTasksTest { } @Test - public void consume_successfulCase() { + public void consume_successfulCase() throws DatafileTaskException { final int noOfEvents = 200; final int noOfFilesPerEvent = 200; final int noOfFiles = noOfEvents * noOfFilesPerEvent; @@ -188,7 +203,7 @@ public class ScheduledTasksTest { Flux fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true); doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse(); - doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any()); + doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any()); Mono collectedFile = Mono.just(filePublishInformation()); doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull()); @@ -212,11 +227,11 @@ public class ScheduledTasksTest { } @Test - public void consume_fetchFailedOnce() { + public void consume_fetchFailedOnce() throws DatafileTaskException { Flux fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse(); - doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any()); + doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any()); Mono collectedFile = Mono.just(filePublishInformation()); Mono error = Mono.error(new Exception("problem")); @@ -246,12 +261,12 @@ public class ScheduledTasksTest { } @Test - public void consume_publishFailedOnce() { + public void consume_publishFailedOnce() throws DatafileTaskException { Flux fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse(); - doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any()); + doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any()); Mono collectedFile = Mono.just(filePublishInformation()); doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull()); @@ -279,7 +294,7 @@ public class ScheduledTasksTest { } @Test - public void consume_successfulCase_sameFileNames() { + public void consume_successfulCase_sameFileNames() throws DatafileTaskException { final int noOfEvents = 1; final int noOfFilesPerEvent = 100; @@ -287,7 +302,7 @@ public class ScheduledTasksTest { Flux fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false); doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse(); - doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any()); + doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any()); Mono collectedFile = Mono.just(filePublishInformation()); doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull()); @@ -303,7 +318,7 @@ public class ScheduledTasksTest { verify(consumerMock, times(1)).getMessageRouterResponse(); verify(fileCollectorMock, times(1)).collectFile(notNull(), anyLong(), notNull(), notNull()); verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull()); - verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), notNull()); + verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), anyString(), notNull()); verifyNoMoreInteractions(dataRouterMock); verifyNoMoreInteractions(fileCollectorMock); verifyNoMoreInteractions(consumerMock); diff --git a/datafile-app-server/src/test/resources/datafile_endpoints.json b/datafile-app-server/src/test/resources/datafile_endpoints.json deleted file mode 100644 index 14dee368..00000000 --- a/datafile-app-server/src/test/resources/datafile_endpoints.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - "configs": { - "dmaap": { - "dmaapConsumerConfiguration": { - "consumerId": "C12", - "dmaapHostName": "localhost", - "dmaapPortNumber": 2222, - "dmaapTopicName": "/events/unauthenticated.VES_NOTIFICATION_OUTPUT", - "dmaapProtocol": "http", - "dmaapUserName": "admin", - "dmaapUserPassword": "admin", - "dmaapContentType": "application/json", - "consumerGroup": "OpenDcae-c12", - "timeoutMs": -1, - "messageLimit": 1 - }, - "dmaapProducerConfiguration": { - "dmaapHostName": "localhost", - "dmaapPortNumber": 3907, - "dmaapProtocol": "https", - "dmaapTopicName": "publish", - "dmaapUserName": "dradmin", - "dmaapUserPassword": "dradmin", - "dmaapContentType": "application/octet-stream" - } - }, - "ftp": { - "ftpesConfiguration": { - "keyCert": "/config/dfc.jks", - "keyPassword": "secret", - "trustedCa": "/config/ftp.jks", - "trustedCaPassword": "secret" - } - }, - "security": { - "trustStorePath" : "trustStorePath", - "trustStorePasswordPath" : "trustStorePasswordPath", - "keyStorePath" : "keyStorePath", - "keyStorePasswordPath" : "keyStorePasswordPath", - "enableDmaapCertAuth" : "enableDmaapCertAuth" - } - } -} - diff --git a/datafile-app-server/src/test/resources/datafile_endpoints_test.json b/datafile-app-server/src/test/resources/datafile_endpoints_test.json new file mode 100644 index 00000000..4d4d00ab --- /dev/null +++ b/datafile-app-server/src/test/resources/datafile_endpoints_test.json @@ -0,0 +1,31 @@ +{ + "dmaap.ftpesConfig.keyCert":"/config/dfc.jks", + "dmaap.ftpesConfig.keyPassword":"secret", + "dmaap.ftpesConfig.trustedCa":"config/ftp.jks", + "dmaap.ftpesConfig.trustedCaPassword":"secret", + "dmaap.security.trustStorePath":"trustStorePath", + "dmaap.security.trustStorePasswordPath":"trustStorePasswordPath", + "dmaap.security.keyStorePath":"keyStorePath", + "dmaap.security.keyStorePasswordPath":"keyStorePasswordPath", + "dmaap.security.enableDmaapCertAuth":"true", + "dmaap.dmaapProducerConfiguration": { + "changeIdentifier":"PM_MEAS_FILES", + "feedName":"feed00" + }, + "streams_subscribes":{ + "dmaap_subscriber":{ + "dmmap_info":{ + "topic_url":"http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12" + }, + "type":"message_router" + } + }, + "feed00":{ + "username":"user", + "log_url":"https://dmaap.example.com/feedlog/972", + "publish_url":"https://message-router.onap.svc.cluster.local:3907/publish/1", + "location":"loc00", + "password":"password", + "publisher_id":"972.360gm" + } +} diff --git a/datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json b/datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json new file mode 100644 index 00000000..a7e2497d --- /dev/null +++ b/datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json @@ -0,0 +1,49 @@ +{ + "dmaap.ftpesConfig.keyCert":"/config/dfc.jks", + "dmaap.ftpesConfig.keyPassword":"secret", + "dmaap.ftpesConfig.trustedCa":"config/ftp.jks", + "dmaap.ftpesConfig.trustedCaPassword":"secret", + "dmaap.security.trustStorePath":"trustStorePath", + "dmaap.security.trustStorePasswordPath":"trustStorePasswordPath", + "dmaap.security.keyStorePath":"keyStorePath", + "dmaap.security.keyStorePasswordPath":"keyStorePasswordPath", + "dmaap.security.enableDmaapCertAuth":"true", + "dmaap.dmaapProducerConfiguration":[ + { + "changeIdentifier":"PM_MEAS_FILES", + "feedName":"feed00" + }, + { + "changeIdentifier":"XX_FILES", + "feedName":"feed01" + }, + { + "changeIdentifier":"YY_FILES", + "feedName":"feed01" + } + ], + "streams_subscribes":{ + "dmaap_subscriber":{ + "dmmap_info":{ + "topic_url":"http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12" + }, + "type":"message_router" + } + }, + "feed00":{ + "username":"user", + "log_url":"https://dmaap.example.com/feedlog/972", + "publish_url":"https://message-router.onap.svc.cluster.local:3907/publish/1", + "location":"loc00", + "password":"password", + "publisher_id":"972.360gm" + }, + "feed01":{ + "username":"user", + "log_url":"feed01::log_url", + "publish_url":"feed01::publish_url", + "location":"loc00", + "password":"", + "publisher_id":"" + } +} -- cgit 1.2.3-korg