diff options
Diffstat (limited to 'datafile-app-server/src')
41 files changed, 1054 insertions, 914 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java index a38eab8f..d324ca99 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java @@ -22,22 +22,38 @@ import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonSyntaxException; import com.google.gson.TypeAdapterFactory; + import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.time.Duration; +import java.util.Map; +import java.util.Properties; import java.util.ServiceLoader; + import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; + +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.CloudConfigurationProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.ComponentScan; import org.springframework.stereotype.Component; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + /** * Holds all configuration for the DFC. * @@ -46,105 +62,174 @@ import org.springframework.stereotype.Component; */ @Component +@ComponentScan("org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers") @EnableConfigurationProperties @ConfigurationProperties("app") public class AppConfig { - - private static final String CONFIG = "configs"; - private static final String DMAAP = "dmaap"; - private static final String DMAAP_PRODUCER = "dmaapProducerConfiguration"; - private static final String DMAAP_CONSUMER = "dmaapConsumerConfiguration"; - private static final String FTP = "ftp"; - private static final String FTPES_CONFIGURATION = "ftpesConfiguration"; - private static final String SECURITY = "security"; private static final Logger logger = LoggerFactory.getLogger(AppConfig.class); - private DmaapConsumerConfiguration dmaapConsumerConfiguration; - private DmaapPublisherConfiguration dmaapPublisherConfiguration; + private ConsumerConfiguration dmaapConsumerConfiguration; + private Map<String, PublisherConfiguration> publishingConfiguration; private FtpesConfig ftpesConfiguration; + private CloudConfigurationProvider cloudConfigurationProvider; + @Value("#{systemEnvironment}") + Properties systemEnvironment; + private Disposable refreshConfigTask = null; @NotEmpty private String filepath; - public synchronized DmaapConsumerConfiguration getDmaapConsumerConfiguration() { + @Autowired + public synchronized void setCloudConfigurationProvider( + CloudConfigurationProvider reactiveCloudConfigurationProvider) { + this.cloudConfigurationProvider = reactiveCloudConfigurationProvider; + } + + public synchronized void setFilepath(String filepath) { + this.filepath = filepath; + } + + /** + * Reads the cloud configuration. + */ + public void initialize() { + stop(); + Map<String, String> context = MappedDiagnosticContext.initializeTraceContext(); + loadConfigurationFromFile(); + + refreshConfigTask = Flux.interval(Duration.ZERO, Duration.ofMinutes(5)) + .flatMap(count -> createRefreshConfigurationTask(count, context)) + .subscribe(e -> logger.info("Refreshed configuration data"), + throwable -> logger.error("Configuration refresh terminated due to exception", throwable), + () -> logger.error("Configuration refresh terminated")); + } + + public void stop() { + if (refreshConfigTask != null) { + refreshConfigTask.dispose(); + refreshConfigTask = null; + } + } + + public synchronized ConsumerConfiguration getDmaapConsumerConfiguration() { return dmaapConsumerConfiguration; } - public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() { - return dmaapPublisherConfiguration; + public synchronized boolean isFeedConfigured(String changeIdentifier) { + return publishingConfiguration.containsKey(changeIdentifier); + } + + public synchronized PublisherConfiguration getPublisherConfiguration(String changeIdentifier) + throws DatafileTaskException { + + if (publishingConfiguration == null) { + throw new DatafileTaskException("No PublishingConfiguration loaded, changeIdentifier: " + changeIdentifier); + } + PublisherConfiguration cfg = publishingConfiguration.get(changeIdentifier); + if (cfg == null) { + throw new DatafileTaskException( + "Cannot find getPublishingConfiguration for changeIdentifier: " + changeIdentifier); + } + return cfg; } public synchronized FtpesConfig getFtpesConfiguration() { return ftpesConfiguration; } + Flux<AppConfig> createRefreshConfigurationTask(Long counter, Map<String, String> context) { + return Flux.just(counter) // + .doOnNext(cnt -> logger.debug("Refresh config {}", cnt)) // + .flatMap(cnt -> readEnvironmentVariables(systemEnvironment, context)) // + .flatMap(this::fetchConfiguration); + } + + Mono<EnvProperties> readEnvironmentVariables(Properties systemEnvironment, Map<String, String> context) { + return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context) + .onErrorResume(AppConfig::onErrorResume); + } + + private static <R> Mono<R> onErrorResume(Throwable trowable) { + logger.error("Could not refresh application configuration {}", trowable.toString()); + return Mono.empty(); + } + + private Mono<AppConfig> fetchConfiguration(EnvProperties env) { + Mono<JsonObject> serviceCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(env) // + .onErrorResume(AppConfig::onErrorResume); + + // Note, have to use this callForServiceConfigurationReactive with EnvProperties, since the + // other ones does not work + EnvProperties dmaapEnv = ImmutableEnvProperties.builder() // + .consulHost(env.consulHost()) // + .consulPort(env.consulPort()) // + .cbsName(env.cbsName()) // + .appName(env.appName() + ":dmaap") // + .build(); // + Mono<JsonObject> dmaapCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(dmaapEnv) + .onErrorResume(t -> Mono.just(new JsonObject())); + + return serviceCfg.zipWith(dmaapCfg, this::parseCloudConfig) // + .onErrorResume(AppConfig::onErrorResume); + } + + /** + * parse configuration + * + * @param serviceConfigRootObject + * @param dmaapConfigRootObject if there is no dmaapConfigRootObject, the dmaap feeds are taken + * from the serviceConfigRootObject + * @return this which is updated if successful + */ + private AppConfig parseCloudConfig(JsonObject serviceConfigRootObject, JsonObject dmaapConfigRootObject) { + try { + CloudConfigParser parser = new CloudConfigParser(serviceConfigRootObject, dmaapConfigRootObject); + setConfiguration(parser.getDmaapConsumerConfig(), parser.getDmaapPublisherConfig(), + parser.getFtpesConfig()); + } catch (DatafileTaskException e) { + logger.error("Could not parse configuration {}", e.toString(), e); + } + return this; + } + /** * Reads the configuration from file. */ - public void loadConfigurationFromFile() { + void loadConfigurationFromFile() { GsonBuilder gsonBuilder = new GsonBuilder(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); - JsonParser parser = new JsonParser(); - JsonObject jsonObject; + try (InputStream inputStream = createInputStream(filepath)) { - JsonElement rootElement = getJsonElement(parser, inputStream); - if (rootElement.isJsonObject()) { - jsonObject = rootElement.getAsJsonObject(); - FtpesConfig ftpesConfig = deserializeType(gsonBuilder, - jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(FTP).getAsJsonObject(FTPES_CONFIGURATION), - FtpesConfig.class); - DmaapConsumerConfiguration consumerConfiguration = deserializeType(gsonBuilder, - concatenateJsonObjects( - jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP) - .getAsJsonObject(DMAAP_CONSUMER), - rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)), - DmaapConsumerConfiguration.class); - - DmaapPublisherConfiguration publisherConfiguration = deserializeType(gsonBuilder, - concatenateJsonObjects( - jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP) - .getAsJsonObject(DMAAP_PRODUCER), - rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)), - DmaapPublisherConfiguration.class); - - setConfiguration(consumerConfiguration, publisherConfiguration, ftpesConfig); + JsonParser parser = new JsonParser(); + JsonObject rootObject = getJsonElement(parser, inputStream).getAsJsonObject(); + if (rootObject == null) { + throw new JsonSyntaxException("Root is not a json object"); } + parseCloudConfig(rootObject, rootObject); } catch (JsonSyntaxException | IOException e) { - logger.error("Problem with loading configuration, file: {}", filepath, e); + logger.warn("Local configuration file not loaded: {}", filepath, e); } } - synchronized void setConfiguration(DmaapConsumerConfiguration consumerConfiguration, - DmaapPublisherConfiguration publisherConfiguration, FtpesConfig ftpesConfig) { - this.dmaapConsumerConfiguration = consumerConfiguration; - this.dmaapPublisherConfiguration = publisherConfiguration; - this.ftpesConfiguration = ftpesConfig; + private synchronized void setConfiguration(ConsumerConfiguration consumerConfiguration, + Map<String, PublisherConfiguration> publisherConfiguration, FtpesConfig ftpesConfig) { + if (consumerConfiguration == null || publisherConfiguration == null || ftpesConfig == null) { + logger.error( + "Problem with configuration consumerConfiguration: {}, publisherConfiguration: {}, ftpesConfig: {}", + consumerConfiguration, publisherConfiguration, ftpesConfig); + } else { + this.dmaapConsumerConfiguration = consumerConfiguration; + this.publishingConfiguration = publisherConfiguration; + this.ftpesConfiguration = ftpesConfig; + } } JsonElement getJsonElement(JsonParser parser, InputStream inputStream) { return parser.parse(new InputStreamReader(inputStream)); } - private <T> T deserializeType(@NotNull GsonBuilder gsonBuilder, @NotNull JsonObject jsonObject, - @NotNull Class<T> type) { - return gsonBuilder.create().fromJson(jsonObject, type); - } - InputStream createInputStream(@NotNull String filepath) throws IOException { return new BufferedInputStream(new FileInputStream(filepath)); } - synchronized String getFilepath() { - return this.filepath; - } - - public synchronized void setFilepath(String filepath) { - this.filepath = filepath; - } - - private JsonObject concatenateJsonObjects(JsonObject target, JsonObject source) { - source.entrySet().forEach(entry -> target.add(entry.getKey(), entry.getValue())); - return target; - } - } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java index 6b7860c4..3ac6b2c6 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java @@ -18,11 +18,17 @@ package org.onap.dcaegen2.collectors.datafile.configuration; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; import com.google.gson.JsonObject; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; /** @@ -32,63 +38,106 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.Immutabl * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ public class CloudConfigParser { - private static final String DMAAP_SECURITY_TRUST_STORE_PATH = "dmaap.security.trustStorePath"; private static final String DMAAP_SECURITY_TRUST_STORE_PASS_PATH = "dmaap.security.trustStorePasswordPath"; private static final String DMAAP_SECURITY_KEY_STORE_PATH = "dmaap.security.keyStorePath"; private static final String DMAAP_SECURITY_KEY_STORE_PASS_PATH = "dmaap.security.keyStorePasswordPath"; private static final String DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH = "dmaap.security.enableDmaapCertAuth"; - private final JsonObject jsonObject; + private final JsonObject serviceConfigurationRoot; + private final JsonObject dmaapConfigurationRoot; - CloudConfigParser(JsonObject jsonObject) { - this.jsonObject = jsonObject; + public CloudConfigParser(JsonObject serviceConfigurationRoot, JsonObject dmaapConfigurationRoot) { + this.serviceConfigurationRoot = serviceConfigurationRoot; + this.dmaapConfigurationRoot = dmaapConfigurationRoot; } - DmaapPublisherConfiguration getDmaapPublisherConfig() { - return new ImmutableDmaapPublisherConfiguration.Builder() - .dmaapTopicName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapTopicName").getAsString()) - .dmaapUserPassword(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserPassword").getAsString()) - .dmaapPortNumber(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapPortNumber").getAsInt()) - .dmaapProtocol(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapProtocol").getAsString()) - .dmaapContentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString()) - .dmaapHostName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapHostName").getAsString()) - .dmaapUserName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserName").getAsString()) - .trustStorePath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PATH).getAsString()) - .trustStorePasswordPath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PASS_PATH).getAsString()) - .keyStorePath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PATH).getAsString()) - .keyStorePasswordPath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PASS_PATH).getAsString()) - .enableDmaapCertAuth(jsonObject.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) // - .build(); + public Map<String, PublisherConfiguration> getDmaapPublisherConfig() throws DatafileTaskException { + Iterator<JsonElement> producerCfgs = + toArray(serviceConfigurationRoot.get("dmaap.dmaapProducerConfiguration")).iterator(); + + Map<String, PublisherConfiguration> result = new HashMap<>(); + + while (producerCfgs.hasNext()) { + JsonObject producerCfg = producerCfgs.next().getAsJsonObject(); + String feedName = getAsString(producerCfg, "feedName"); + JsonObject feedConfig = getFeedConfig(feedName); + + PublisherConfiguration cfg = ImmutablePublisherConfiguration.builder() // + .publishUrl(getAsString(feedConfig, "publish_url")) // + .passWord(getAsString(feedConfig, "password")) // + .userName(getAsString(feedConfig, "username")) // + .trustStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PATH)) // + .trustStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) // + .keyStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PATH)) // + .keyStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) // + .enableDmaapCertAuth( + get(serviceConfigurationRoot, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) // + .changeIdentifier(getAsString(producerCfg, "changeIdentifier")) // + .logUrl(getAsString(feedConfig, "log_url")) // + .build(); + + result.put(cfg.changeIdentifier(), cfg); + } + return result; } - DmaapConsumerConfiguration getDmaapConsumerConfig() { - return new ImmutableDmaapConsumerConfiguration.Builder() - .timeoutMs(jsonObject.get("dmaap.dmaapConsumerConfiguration.timeoutMs").getAsInt()) - .dmaapHostName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapHostName").getAsString()) - .dmaapUserName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserName").getAsString()) - .dmaapUserPassword(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserPassword").getAsString()) - .dmaapTopicName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapTopicName").getAsString()) - .dmaapPortNumber(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapPortNumber").getAsInt()) - .dmaapContentType(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapContentType").getAsString()) - .messageLimit(jsonObject.get("dmaap.dmaapConsumerConfiguration.messageLimit").getAsInt()) - .dmaapProtocol(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapProtocol").getAsString()) - .consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString()) - .consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString()) - .trustStorePath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PATH).getAsString()) - .trustStorePasswordPath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PASS_PATH).getAsString()) - .keyStorePath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PATH).getAsString()) - .keyStorePasswordPath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PASS_PATH).getAsString()) - .enableDmaapCertAuth(jsonObject.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) // + public ConsumerConfiguration getDmaapConsumerConfig() throws DatafileTaskException { + JsonObject consumerCfg = serviceConfigurationRoot.get("streams_subscribes").getAsJsonObject(); + Set<Entry<String, JsonElement>> topics = consumerCfg.entrySet(); + if (topics.size() != 1) { + throw new DatafileTaskException("Invalid configuration, number oftopic must be one, config: " + topics); + } + JsonObject topic = topics.iterator().next().getValue().getAsJsonObject(); + JsonObject dmaapInfo = get(topic, "dmmap_info").getAsJsonObject(); + String topicUrl = getAsString(dmaapInfo, "topic_url"); + + return ImmutableConsumerConfiguration.builder().topicUrl(topicUrl) + .trustStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PATH)) + .trustStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) + .keyStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PATH)) + .keyStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) + .enableDmaapCertAuth( + get(serviceConfigurationRoot, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) // .build(); } - FtpesConfig getFtpesConfig() { + public FtpesConfig getFtpesConfig() throws DatafileTaskException { return new ImmutableFtpesConfig.Builder() // - .keyCert(jsonObject.get("dmaap.ftpesConfig.keyCert").getAsString()) - .keyPassword(jsonObject.get("dmaap.ftpesConfig.keyPassword").getAsString()) - .trustedCa(jsonObject.get("dmaap.ftpesConfig.trustedCa").getAsString()) - .trustedCaPassword(jsonObject.get("dmaap.ftpesConfig.trustedCaPassword").getAsString()) // + .keyCert(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyCert")) + .keyPassword(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyPassword")) + .trustedCa(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.trustedCa")) + .trustedCaPassword(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.trustedCaPassword")) // .build(); } + + private static JsonElement get(JsonObject obj, String memberName) throws DatafileTaskException { + JsonElement elem = obj.get(memberName); + if (elem == null) { + throw new DatafileTaskException("Could not find member: " + memberName + " in: " + obj); + } + return elem; + } + + private static String getAsString(JsonObject obj, String memberName) throws DatafileTaskException { + return get(obj, memberName).getAsString(); + } + + private JsonObject getFeedConfig(String feedName) throws DatafileTaskException { + JsonElement elem = dmaapConfigurationRoot.get(feedName); + if (elem == null) { + elem = get(serviceConfigurationRoot, feedName); // Fallback, try to find it under + // serviceConfigurationRoot + } + return elem.getAsJsonObject(); + } + + private static JsonArray toArray(JsonElement obj) { + if (obj.isJsonArray()) { + return obj.getAsJsonArray(); + } + JsonArray arr = new JsonArray(); + arr.add(obj); + return arr; + } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java deleted file mode 100644 index 597f525f..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java +++ /dev/null @@ -1,110 +0,0 @@ -/*- - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.configuration; - -import com.google.gson.JsonObject; -import java.util.Map; -import java.util.Properties; -import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.ReactiveCloudConfigurationProvider; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.ComponentScan; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Primary; -import org.springframework.scheduling.annotation.EnableScheduling; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; - -/** - * Gets the DFC configuration from the ConfigBindingService/Consul and parses it to the configurations needed in DFC. - * - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18 - * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> - */ -@Configuration -@ComponentScan("org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers") -@EnableConfigurationProperties -@EnableScheduling -@Primary -public class CloudConfiguration extends AppConfig { - private static final Logger logger = LoggerFactory.getLogger(CloudConfiguration.class); - private ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider; - private DmaapPublisherConfiguration dmaapPublisherCloudConfiguration; - private DmaapConsumerConfiguration dmaapConsumerCloudConfiguration; - private FtpesConfig ftpesCloudConfiguration; - - @Value("#{systemEnvironment}") - private Properties systemEnvironment; - - @Autowired - public synchronized void setThreadPoolTaskScheduler( - ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider) { - this.reactiveCloudConfigurationProvider = reactiveCloudConfigurationProvider; - } - - /** - * Reads the cloud configuration. - */ - public void runTask() { - Map<String,String> context = MappedDiagnosticContext.initializeTraceContext(); - EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context) // - .subscribeOn(Schedulers.parallel()) // - .flatMap(reactiveCloudConfigurationProvider::callForServiceConfigurationReactive) // - .flatMap(this::parseCloudConfig) // - .subscribe(null, this::onError, this::onComplete); - } - - private void onComplete() { - logger.trace("Configuration updated"); - } - - private void onError(Throwable throwable) { - logger.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable); - } - - private synchronized Mono<CloudConfiguration> parseCloudConfig(JsonObject jsonObject) { - logger.info("Received application configuration: {}", jsonObject); - CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject); - dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig(); - dmaapConsumerCloudConfiguration = cloudConfigParser.getDmaapConsumerConfig(); - ftpesCloudConfiguration = cloudConfigParser.getFtpesConfig(); - return Mono.just(this); - } - - @Override - public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() { - return dmaapPublisherCloudConfiguration != null ? dmaapPublisherCloudConfiguration - : super.getDmaapPublisherConfiguration(); - } - - @Override - public synchronized DmaapConsumerConfiguration getDmaapConsumerConfiguration() { - return dmaapConsumerCloudConfiguration != null ? dmaapConsumerCloudConfiguration - : super.getDmaapConsumerConfiguration(); - } - - @Override - public synchronized FtpesConfig getFtpesConfiguration() { - return ftpesCloudConfiguration != null ? ftpesCloudConfiguration : super.getFtpesConfiguration(); - } -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java new file mode 100644 index 00000000..fc9ab204 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java @@ -0,0 +1,105 @@ +/*- + * ============LICENSE_START====================================================================== + * Copyright (C) 2018,2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.configuration; + +import java.net.MalformedURLException; +import java.net.URL; + +import org.immutables.gson.Gson; +import org.immutables.value.Value; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration; + +@Value.Immutable +@Value.Style(redactedMask = "####") +@Gson.TypeAdapters +public abstract class ConsumerConfiguration { + @Value.Redacted + public abstract String topicUrl(); + + public abstract String trustStorePath(); + + public abstract String trustStorePasswordPath(); + + public abstract String keyStorePath(); + + public abstract String keyStorePasswordPath(); + + public abstract Boolean enableDmaapCertAuth(); + + public DmaapConsumerConfiguration toDmaap() throws DatafileTaskException { + try { + URL url = new URL(topicUrl()); + String passwd = ""; + String userName = ""; + if (url.getUserInfo() != null) { + String[] userInfo = url.getUserInfo().split(":"); + userName = userInfo[0]; + passwd = userInfo[1]; + } + String urlPath = url.getPath(); + DmaapConsumerUrlPath path = parseDmaapUrlPath(urlPath); + + return new ImmutableDmaapConsumerConfiguration.Builder() // + .dmaapContentType("application/json") // + .dmaapPortNumber(url.getPort()) // + .dmaapHostName(url.getHost()) // + .dmaapTopicName(path.dmaapTopicName) // + .dmaapProtocol(url.getProtocol()) // + .dmaapUserName(userName) // + .dmaapUserPassword(passwd) // + .trustStorePath(this.trustStorePath()) // + .trustStorePasswordPath(this.trustStorePasswordPath()) // + .keyStorePath(this.keyStorePath()) // + .keyStorePasswordPath(this.keyStorePasswordPath()) // + .enableDmaapCertAuth(this.enableDmaapCertAuth()) // + .consumerId(path.consumerId) // + .consumerGroup(path.consumerGroup) // + .timeoutMs(-1) // + .messageLimit(-1) // + .build(); + } catch (MalformedURLException e) { + throw new DatafileTaskException("Could not parse the URL", e); + } + } + + private class DmaapConsumerUrlPath { + final String dmaapTopicName; + final String consumerGroup; + final String consumerId; + + DmaapConsumerUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) { + this.dmaapTopicName = dmaapTopicName; + this.consumerGroup = consumerGroup; + this.consumerId = consumerId; + } + } + + private DmaapConsumerUrlPath parseDmaapUrlPath(String urlPath) throws DatafileTaskException { + String[] tokens = urlPath.split("/"); // UrlPath: /events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12 + if (tokens.length != 5) { + throw new DatafileTaskException("The path has incorrect syntax: " + urlPath); + } + + final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // ex. // /events/unauthenticated.VES_NOTIFICATION_OUTPUT + final String consumerGroup = tokens[3]; // ex. OpenDcae-c12 + final String consumerId = tokens[4]; // ex. C12 + return new DmaapConsumerUrlPath(dmaapTopicName, consumerGroup, consumerId); + } + +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java index 71003f80..62af92a8 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java @@ -19,12 +19,14 @@ package org.onap.dcaegen2.collectors.datafile.configuration; import java.util.Map; import java.util.Optional; import java.util.Properties; + import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; + import reactor.core.publisher.Mono; /** @@ -53,7 +55,7 @@ class EnvironmentProcessor { } catch (EnvironmentLoaderException e) { return Mono.error(e); } - logger.info("Evaluated environment system variables {}", envProperties); + logger.trace("Evaluated environment system variables {}", envProperties); return Mono.just(envProperties); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java index 3f029359..844699eb 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java @@ -21,6 +21,7 @@ package org.onap.dcaegen2.collectors.datafile.configuration; import java.io.Serializable; + import org.immutables.gson.Gson; import org.immutables.value.Value; import org.springframework.stereotype.Component; @@ -28,7 +29,7 @@ import org.springframework.stereotype.Component; @Component @Value.Immutable -@Value.Style(builder = "new") +@Value.Style(builder = "new", redactedMask = "####") @Gson.TypeAdapters public abstract class FtpesConfig implements Serializable { @@ -38,11 +39,13 @@ public abstract class FtpesConfig implements Serializable { public abstract String keyCert(); @Value.Parameter + @Value.Redacted public abstract String keyPassword(); @Value.Parameter public abstract String trustedCa(); @Value.Parameter + @Value.Redacted public abstract String trustedCaPassword(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java new file mode 100644 index 00000000..5576ed26 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java @@ -0,0 +1,74 @@ +/*- + * ============LICENSE_START====================================================================== + * Copyright (C) 2018,2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.configuration; + +import java.net.MalformedURLException; +import java.net.URL; + +import org.immutables.gson.Gson; +import org.immutables.value.Value; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; + + +@Value.Immutable +@Value.Style(redactedMask = "####") +@Gson.TypeAdapters +public interface PublisherConfiguration { + + String publishUrl(); + + String logUrl(); + + String userName(); + + @Value.Redacted + String passWord(); + + String trustStorePath(); + + String trustStorePasswordPath(); + + String keyStorePath(); + + String keyStorePasswordPath(); + + Boolean enableDmaapCertAuth(); + + String changeIdentifier(); + + default DmaapPublisherConfiguration toDmaap() throws MalformedURLException { + URL url = new URL(publishUrl()); + String urlPath = url.getPath(); + + return new ImmutableDmaapPublisherConfiguration.Builder() // + .dmaapContentType("application/octet-stream") // + .dmaapPortNumber(url.getPort()) // + .dmaapHostName(url.getHost()) // + .dmaapTopicName(urlPath) // + .dmaapProtocol(url.getProtocol()) // + .dmaapUserName(this.userName()) // + .dmaapUserPassword(this.passWord()) // + .trustStorePath(this.trustStorePath()) // + .trustStorePasswordPath(this.trustStorePasswordPath()) // + .keyStorePath(this.keyStorePath()) // + .keyStorePasswordPath(this.keyStorePasswordPath()) // + .enableDmaapCertAuth(this.enableDmaapCertAuth()) // + .build(); + } + +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java index b78e4ae5..5835de1a 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java @@ -16,7 +16,6 @@ package org.onap.dcaegen2.collectors.datafile.configuration; -import io.swagger.annotations.ApiOperation; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; @@ -24,7 +23,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledFuture; + import javax.annotation.PostConstruct; + import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks; import org.slf4j.Logger; @@ -36,6 +37,8 @@ import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableScheduling; + +import io.swagger.annotations.ApiOperation; import reactor.core.publisher.Mono; /** @@ -49,7 +52,6 @@ import reactor.core.publisher.Mono; public class SchedulerConfig { private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = Duration.ofSeconds(15); - private static final Duration SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = Duration.ofMinutes(5); private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE = Duration.ofHours(1); private static final Logger logger = LoggerFactory.getLogger(SchedulerConfig.class); private static List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>(); @@ -57,21 +59,21 @@ public class SchedulerConfig { private final TaskScheduler taskScheduler; private final ScheduledTasks scheduledTask; - private final CloudConfiguration cloudConfiguration; + private final AppConfig configuration; /** * Constructor. * * @param taskScheduler The scheduler used to schedule the tasks. * @param scheduledTasks The scheduler that will actually handle the tasks. - * @param cloudConfiguration The DFC configuration. + * @param configuration The DFC configuration. */ @Autowired public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTasks, - CloudConfiguration cloudConfiguration) { + AppConfig configuration) { this.taskScheduler = taskScheduler; this.scheduledTask = scheduledTasks; - this.cloudConfiguration = cloudConfiguration; + this.configuration = configuration; } /** @@ -83,6 +85,7 @@ public class SchedulerConfig { public synchronized Mono<ResponseEntity<String>> getResponseFromCancellationOfTasks() { scheduledFutureList.forEach(x -> x.cancel(false)); scheduledFutureList.clear(); + configuration.stop(); MDC.setContextMap(contextMap); logger.info("Stopped Datafile workflow"); MDC.clear(); @@ -99,12 +102,11 @@ public class SchedulerConfig { public synchronized boolean tryToStartTask() { contextMap = MappedDiagnosticContext.initializeTraceContext(); logger.info("Start scheduling Datafile workflow"); + configuration.initialize(); + if (scheduledFutureList.isEmpty()) { - scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(cloudConfiguration::runTask, - Instant.now(), SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY)); - scheduledFutureList.add( - taskScheduler.scheduleWithFixedDelay(scheduledTask::executeDatafileMainTask, - SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS)); + scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(scheduledTask::executeDatafileMainTask, + SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS)); scheduledFutureList .add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()), SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE)); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java index 71242265..5a78261a 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java @@ -62,7 +62,7 @@ public class SwaggerConfig extends WebMvcConfigurationSupport { .build(); } - private ApiInfo apiInfo() { + private static ApiInfo apiInfo() { return new ApiInfoBuilder() // .title(API_TITLE) // .description(DESCRIPTION) // diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java index cbd67297..847ca46e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java @@ -41,7 +41,7 @@ public class TomcatHttpConfig { return tomcat; } - private Connector getHttpConnector() { + private static Connector getHttpConnector() { Connector connector = new Connector(TomcatServletWebServerFactory.DEFAULT_PROTOCOL); connector.setScheme("http"); connector.setPort(8100); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java index 4716fa87..791f0cf1 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java @@ -71,7 +71,7 @@ public class ScheduleController { public Mono<ResponseEntity<String>> startTasks() { return Mono.fromSupplier(schedulerConfig::tryToStartTask) // - .map(this::createStartTaskResponse); + .map(ScheduleController::createStartTaskResponse); } /** @@ -90,7 +90,7 @@ public class ScheduleController { } @ApiOperation(value = "Sends success or error response on starting task execution") - private ResponseEntity<String> createStartTaskResponse(boolean wasScheduled) { + private static ResponseEntity<String> createStartTaskResponse(boolean wasScheduled) { if (wasScheduled) { return new ResponseEntity<>("Datafile Service has been started!", HttpStatus.CREATED); } else { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java index 4c49dd8a..72623db2 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java @@ -17,6 +17,7 @@ package org.onap.dcaegen2.collectors.datafile.ftp; import java.util.Optional; + import org.immutables.value.Value; /** @@ -26,11 +27,13 @@ import org.immutables.value.Value; * */ @Value.Immutable +@Value.Style(redactedMask = "####") public interface FileServerData { public String serverAddress(); public String userId(); + @Value.Redacted public String password(); public Optional<Integer> port(); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java index c78ae3a3..bb3016ce 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java @@ -128,7 +128,7 @@ public class FtpsClient implements FileCollectClient { logger.trace("collectFile fetched: {}", localFileName); } - private int getPort(Optional<Integer> port) { + private static int getPort(Optional<Integer> port) { return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT; } @@ -180,7 +180,7 @@ public class FtpsClient implements FileCollectClient { logger.warn("Local file {} already created", localFileName); } OutputStream output = new FileOutputStream(localFile); - logger.debug("File {} opened xNF", localFileName); + logger.trace("File {} opened xNF", localFileName); return output; } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java index 333be92a..bdaf6d4c 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java @@ -55,7 +55,7 @@ public class SftpClient implements FileCollectClient { try { sftpChannel.get(remoteFile, localFile.toString()); - logger.debug("File {} Download Successfull from xNF", localFile.getFileName()); + logger.trace("File {} Download Successfull from xNF", localFile.getFileName()); } catch (SftpException e) { boolean retry = e.id != ChannelSftp.SSH_FX_NO_SUCH_FILE && e.id != ChannelSftp.SSH_FX_PERMISSION_DENIED && e.id != ChannelSftp.SSH_FX_OP_UNSUPPORTED; throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e, retry); @@ -90,11 +90,11 @@ public class SftpClient implements FileCollectClient { } } - private int getPort(Optional<Integer> port) { + private static int getPort(Optional<Integer> port) { return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT; } - private Session setUpSession(FileServerData fileServerData) throws JSchException { + private static Session setUpSession(FileServerData fileServerData) throws JSchException { JSch jsch = new JSch(); Session newSession = @@ -105,7 +105,7 @@ public class SftpClient implements FileCollectClient { return newSession; } - private ChannelSftp getChannel(Session session) throws JSchException { + private static ChannelSftp getChannel(Session session) throws JSchException { Channel channel = session.openChannel("sftp"); channel.connect(); return (ChannelSftp) channel; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java index 0a6b669c..36aae949 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java @@ -38,6 +38,7 @@ import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; */ @Value.Immutable @Gson.TypeAdapters +@Value.Style(redactedMask = "####") public abstract class FileData { public static final String DATAFILE_TMPDIR = "/tmp/onap_datafile/"; @@ -53,6 +54,7 @@ public abstract class FileData { * * @return the URL to use to fetch the file from the PNF */ + @Value.Redacted public abstract String location(); /** @@ -123,7 +125,7 @@ public abstract class FileData { * @return An <code>Optional</code> containing a String array with the user name and password if given, or an empty * <code>Optional</code> if not given. */ - private Optional<String[]> getUserNameAndPasswordIfGiven(String userInfoString) { + private static Optional<String[]> getUserNameAndPasswordIfGiven(String userInfoString) { if (userInfoString != null) { String[] userAndPassword = userInfoString.split(":"); if (userAndPassword.length == 2) { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java index 63ed0daa..5b8c015e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java @@ -33,6 +33,7 @@ import org.immutables.value.Value; @Value.Immutable @Gson.TypeAdapters +@Value.Style(redactedMask = "####") public interface FilePublishInformation { @SerializedName("productName") @@ -54,6 +55,7 @@ public interface FilePublishInformation { String getTimeZoneOffset(); @SerializedName("location") + @Value.Redacted String getLocation(); @SerializedName("compression") @@ -70,4 +72,6 @@ public interface FilePublishInformation { String getName(); Map<String, String> getContext(); + + String getChangeIdentifier(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java index 7081d1ac..b8df125b 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java @@ -31,12 +31,12 @@ import java.util.Set; */ public abstract class JsonSerializer { + private JsonSerializer() {} - private static Gson gson = - new GsonBuilder() // - .serializeNulls() // - .addSerializationExclusionStrategy(new FilePublishInformationExclusionStrategy()) // - .create(); // + private static Gson gson = new GsonBuilder() // + .serializeNulls() // + .addSerializationExclusionStrategy(new FilePublishInformationExclusionStrategy()) // + .create(); // /** * Serializes a <code>filePublishInformation</code>. @@ -56,9 +56,10 @@ public abstract class JsonSerializer { private final Set<String> inclusions = Sets.newHashSet("productName", "vendorName", "lastEpochMicrosec", "sourceName", "startEpochMicrosec", "timeZoneOffset", "location", "compression", "fileFormatType", "fileFormatVersion"); + @Override - public boolean shouldSkipField(FieldAttributes f) { - return !inclusions.contains(f.getName()); + public boolean shouldSkipField(FieldAttributes fieldAttributes) { + return !inclusions.contains(fieldAttributes.getName()); } @Override diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java index 3a3eb3aa..470c4e73 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java @@ -22,11 +22,13 @@ import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; + import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.stream.StreamSupport; + import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; @@ -37,6 +39,7 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.StringUtils; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -70,7 +73,6 @@ public class JsonMessageParser { private static final String FILE_FORMAT_VERSION = "fileFormatVersion"; private static final String FILE_READY_CHANGE_TYPE = "FileReady"; - private static final String FILE_READY_CHANGE_IDENTIFIER = "PM_MEAS_FILES"; /** * The data types available in the event name. @@ -92,7 +94,7 @@ public class JsonMessageParser { * @return a <code>Flux</code> containing messages. */ public Flux<FileReadyMessage> getMessagesFromJson(Mono<String> rawMessage) { - return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createMessageData); + return rawMessage.flatMapMany(JsonMessageParser::getJsonParserMessage).flatMap(this::createMessageData); } Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { @@ -123,18 +125,17 @@ public class JsonMessageParser { : getMessagesFromJsonArray(jsonElement); } - private Mono<JsonElement> getJsonParserMessage(String message) { - logger.trace("original message from message router: {}", message); + private static Mono<JsonElement> getJsonParserMessage(String message) { return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message)); } - private Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) { + private static Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) { return jsonObject.flatMap(monoJsonP -> containsNotificationFields(monoJsonP) ? transformMessages(monoJsonP) : logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject)); } - private Mono<FileReadyMessage> transformMessages(JsonObject message) { + private static Mono<FileReadyMessage> transformMessages(JsonObject message) { Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message); if (optionalMessageMetaData.isPresent()) { MessageMetaData messageMetaData = optionalMessageMetaData.get(); @@ -159,7 +160,7 @@ public class JsonMessageParser { } - private Optional<MessageMetaData> getMessageMetaData(JsonObject message) { + private static Optional<MessageMetaData> getMessageMetaData(JsonObject message) { List<String> missingValues = new ArrayList<>(); JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER); String eventName = getValueFromJson(commonEventHeader, EVENT_NAME, missingValues); @@ -182,15 +183,15 @@ public class JsonMessageParser { .changeIdentifier(changeIdentifier) // .changeType(changeType) // .build(); - if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) { + if (missingValues.isEmpty() && isChangeTypeCorrect(changeType)) { return Optional.of(messageMetaData); } else { String errorMessage = "VES event parsing."; if (!missingValues.isEmpty()) { errorMessage += " Missing data: " + missingValues; } - if (!isChangeIdentifierCorrect(changeIdentifier) || !isChangeTypeCorrect(changeType)) { - errorMessage += " Change identifier or change type is wrong."; + if (!isChangeTypeCorrect(changeType)) { + errorMessage += " Change type is wrong: " + changeType + " expected: " + FILE_READY_CHANGE_TYPE; } errorMessage += " Message: {}"; logger.error(errorMessage, message); @@ -198,15 +199,12 @@ public class JsonMessageParser { } } - private boolean isChangeTypeCorrect(String changeType) { + private static boolean isChangeTypeCorrect(String changeType) { return FILE_READY_CHANGE_TYPE.equals(changeType); } - private boolean isChangeIdentifierCorrect(String changeIdentifier) { - return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier); - } - - private List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields, MessageMetaData messageMetaData) { + private static List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields, + MessageMetaData messageMetaData) { List<FileData> res = new ArrayList<>(); for (int i = 0; i < arrayOfAdditionalFields.size(); i++) { JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i); @@ -219,7 +217,7 @@ public class JsonMessageParser { return res; } - private Optional<FileData> getFileDataFromJson(JsonObject fileInfo, MessageMetaData messageMetaData) { + private static Optional<FileData> getFileDataFromJson(JsonObject fileInfo, MessageMetaData messageMetaData) { logger.trace("starting to getFileDataFromJson!"); List<String> missingValues = new ArrayList<>(); @@ -250,15 +248,17 @@ public class JsonMessageParser { } /** - * Gets data from the event name. Defined as: {DomainAbbreviation}_{productName}-{vendorName}_{Description}, - * example: Noti_RnNode-Ericsson_FileReady + * Gets data from the event name. Defined as: + * {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example: + * Noti_RnNode-Ericsson_FileReady * * @param dataType The type of data to get, {@link DmaapConsumerJsonParser.EventNameDataType}. * @param eventName The event name to get the data from. * @param missingValues List of missing values. The dataType will be added if missing. * @return String of data from event name */ - private String getDataFromEventName(EventNameDataType dataType, String eventName, List<String> missingValues) { + private static String getDataFromEventName(EventNameDataType dataType, String eventName, + List<String> missingValues) { String[] eventArray = eventName.split("_|-"); if (eventArray.length >= 4) { return eventArray[dataType.index]; @@ -269,7 +269,7 @@ public class JsonMessageParser { return ""; } - private String getValueFromJson(JsonObject jsonObject, String jsonKey, List<String> missingValues) { + private static String getValueFromJson(JsonObject jsonObject, String jsonKey, List<String> missingValues) { if (jsonObject.has(jsonKey)) { return jsonObject.get(jsonKey).getAsString(); } else { @@ -278,11 +278,11 @@ public class JsonMessageParser { } } - private boolean containsNotificationFields(JsonObject jsonObject) { + private static boolean containsNotificationFields(JsonObject jsonObject) { return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS); } - private Flux<FileReadyMessage> logErrorAndReturnEmptyMessageFlux(String errorMessage) { + private static Flux<FileReadyMessage> logErrorAndReturnEmptyMessageFlux(String errorMessage) { logger.error(errorMessage); return Flux.empty(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java index 2b3a0ef6..ff267815 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java @@ -67,7 +67,7 @@ public class PublishedFileCache { return publishedFiles.size(); } - private boolean isCachedPublishedFileOutdated(Instant now, Instant then) { + private static boolean isCachedPublishedFileOutdated(Instant now, Instant then) { final int timeToKeepInfoInSeconds = 60 * 60 * 24; return now.getEpochSecond() - then.getEpochSecond() > timeToKeepInfoInSeconds; } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java index 8d433827..198c1bf1 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java @@ -27,7 +27,9 @@ import java.security.NoSuchAlgorithmException; import java.time.Duration; import java.util.Map; import java.util.concurrent.Future; + import javax.net.ssl.SSLContext; + import org.apache.commons.codec.binary.Base64; import org.apache.http.HttpResponse; import org.apache.http.client.config.RequestConfig; @@ -44,8 +46,6 @@ import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.slf4j.Marker; import org.slf4j.MarkerFactory; -import org.springframework.web.util.DefaultUriBuilderFactory; -import org.springframework.web.util.UriBuilder; /** * Client used to send requests to DataRouter. @@ -139,22 +139,8 @@ public class DmaapProducerHttpClient { request.addHeader("Authorization", "Basic " + base64Creds); } - /** - * Gets a <code>UriBuilder</code> containing the base URI needed talk to DataRouter. Specific parts can then be - * added to the URI by the user. - * - * @return a <code>UriBuilder</code> containing the base URI needed talk to DataRouter. - */ - public UriBuilder getBaseUri() { - return new DefaultUriBuilderFactory().builder() // - .scheme(configuration.dmaapProtocol()) // - .host(configuration.dmaapHostName()) // - .port(configuration.dmaapPortNumber()); - } - private CloseableHttpAsyncClient createWebClient(boolean expectRedirect, Duration requestTimeout, - Map<String, String> contextMap) - throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException { + Map<String, String> contextMap) throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException { SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build(); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java index e50ef580..f1d33454 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java @@ -21,6 +21,7 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; import org.onap.dcaegen2.collectors.datafile.service.DmaapWebClient; import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser; @@ -29,6 +30,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consume import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.reactive.function.client.WebClient; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -43,7 +45,7 @@ public class DMaaPMessageConsumer { private final JsonMessageParser jsonMessageParser; private final DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient; - public DMaaPMessageConsumer(AppConfig datafileAppConfig) { + public DMaaPMessageConsumer(AppConfig datafileAppConfig) throws DatafileTaskException { this.jsonMessageParser = new JsonMessageParser(); this.dmaaPConsumerReactiveHttpClient = createHttpClient(datafileAppConfig); } @@ -69,8 +71,9 @@ public class DMaaPMessageConsumer { return jsonMessageParser.getMessagesFromJson(message); } - private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig) { - DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration(); + private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig) + throws DatafileTaskException { + DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration().toDmaap(); WebClient client = new DmaapWebClient().fromConfiguration(config).build(); return new DMaaPConsumerReactiveHttpClient(config, client); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java index e5dd01e9..1d6baa65 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -25,6 +25,7 @@ import com.google.gson.JsonParser; import java.io.IOException; import java.io.InputStream; +import java.net.MalformedURLException; import java.net.URI; import java.nio.file.Path; import java.time.Duration; @@ -34,6 +35,8 @@ import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpPut; import org.apache.http.entity.ByteArrayEntity; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.JsonSerializer; import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; @@ -46,6 +49,7 @@ import org.slf4j.MDC; import org.springframework.core.io.FileSystemResource; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; +import org.springframework.web.util.DefaultUriBuilderFactory; import reactor.core.publisher.Mono; @@ -58,12 +62,9 @@ import reactor.core.publisher.Mono; public class DataRouterPublisher { private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META"; private static final String CONTENT_TYPE = "application/octet-stream"; - private static final String PUBLISH_TOPIC = "publish"; - private static final String DEFAULT_FEED_ID = "1"; private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class); private final AppConfig datafileAppConfig; - private DmaapProducerHttpClient dmaapProducerReactiveHttpClient; public DataRouterPublisher(AppConfig datafileAppConfig) { this.datafileAppConfig = datafileAppConfig; @@ -80,7 +81,6 @@ public class DataRouterPublisher { public Mono<FilePublishInformation> publishFile(FilePublishInformation publishInfo, long numRetries, Duration firstBackoff) { MDC.setContextMap(publishInfo.getContext()); - dmaapProducerReactiveHttpClient = resolveClient(); return Mono.just(publishInfo) // .cache() // .flatMap(this::publishFile) // @@ -92,13 +92,14 @@ public class DataRouterPublisher { MDC.setContextMap(publishInfo.getContext()); logger.trace("Entering publishFile with {}", publishInfo); try { + DmaapProducerHttpClient dmaapProducerHttpClient = resolveClient(publishInfo.getChangeIdentifier()); HttpPut put = new HttpPut(); prepareHead(publishInfo, put); prepareBody(publishInfo, put); - dmaapProducerReactiveHttpClient.addUserCredentialsToHead(put); + dmaapProducerHttpClient.addUserCredentialsToHead(put); HttpResponse response = - dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext()); + dmaapProducerHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext()); logger.trace("{}", response); return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode())); } catch (Exception e) { @@ -107,11 +108,18 @@ public class DataRouterPublisher { } } - private void prepareHead(FilePublishInformation publishInfo, HttpPut put) { + private void prepareHead(FilePublishInformation publishInfo, HttpPut put) throws DatafileTaskException { + put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE); JsonElement metaData = new JsonParser().parse(JsonSerializer.createJsonBodyForDataRouter(publishInfo)); put.addHeader(X_DMAAP_DR_META, metaData.toString()); - put.setURI(getPublishUri(publishInfo.getName())); + URI uri = new DefaultUriBuilderFactory( + datafileAppConfig.getPublisherConfiguration(publishInfo.getChangeIdentifier()).publishUrl()) // + .builder() // + .pathSegment(publishInfo.getName()) // + .build(); + put.setURI(uri); + MappedDiagnosticContext.appendTraceInfo(put); } @@ -122,14 +130,7 @@ public class DataRouterPublisher { } } - private URI getPublishUri(String fileName) { - return dmaapProducerReactiveHttpClient.getBaseUri() // - .pathSegment(PUBLISH_TOPIC) // - .pathSegment(DEFAULT_FEED_ID) // - .pathSegment(fileName).build(); - } - - private Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation publishInfo) { + private static Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation publishInfo) { MDC.setContextMap(publishInfo.getContext()); if (HttpUtils.isSuccessfulResponseCode(response.value())) { logger.trace("Publish to DR successful!"); @@ -145,11 +146,17 @@ public class DataRouterPublisher { return realResource.getInputStream(); } - DmaapPublisherConfiguration resolveConfiguration() { - return datafileAppConfig.getDmaapPublisherConfiguration(); + PublisherConfiguration resolveConfiguration(String changeIdentifer) throws DatafileTaskException { + return datafileAppConfig.getPublisherConfiguration(changeIdentifer); } - DmaapProducerHttpClient resolveClient() { - return new DmaapProducerHttpClient(resolveConfiguration()); + DmaapProducerHttpClient resolveClient(String changeIdentifier) throws DatafileTaskException { + try { + DmaapPublisherConfiguration cfg = resolveConfiguration(changeIdentifier).toDmaap(); + return new DmaapProducerHttpClient(cfg); + } catch (MalformedURLException e) { + throw new DatafileTaskException("Cannot resolve producer client", e); + } + } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java index 0c62795e..6ddcb541 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -74,12 +74,12 @@ public class FileCollector { return Mono.just(fileData) // .cache() // - .flatMap(fd -> collectFile(fileData, contextMap)) // + .flatMap(fd -> tryCollectFile(fileData, contextMap)) // .retryBackoff(numRetries, firstBackoff) // - .flatMap(this::checkCollectedFile); + .flatMap(FileCollector::checkCollectedFile); } - private Mono<FilePublishInformation> checkCollectedFile(Optional<FilePublishInformation> info) { + private static Mono<FilePublishInformation> checkCollectedFile(Optional<FilePublishInformation> info) { if (info.isPresent()) { return Mono.just(info.get()); } else { @@ -88,7 +88,7 @@ public class FileCollector { } } - private Mono<Optional<FilePublishInformation>> collectFile(FileData fileData, Map<String, String> context) { + private Mono<Optional<FilePublishInformation>> tryCollectFile(FileData fileData, Map<String, String> context) { MDC.setContextMap(context); logger.trace("starting to collectFile {}", fileData.name()); @@ -110,7 +110,7 @@ public class FileCollector { } } catch (Exception throwable) { logger.warn("Failed to close ftp client: {} {}, reason: {}", fileData.sourceName(), fileData.name(), - throwable.toString()); + throwable.toString(), throwable); return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context))); } } @@ -126,7 +126,7 @@ public class FileCollector { } } - private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile, + private static FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile, Map<String, String> context) { String location = fileData.location(); MessageMetaData metaData = fileData.messageMetaData(); @@ -143,6 +143,7 @@ public class FileCollector { .compression(fileData.compression()) // .fileFormatType(fileData.fileFormatType()) // .fileFormatVersion(fileData.fileFormatVersion()) // + .changeIdentifier(fileData.messageMetaData().changeIdentifier()) // .context(context) // .build(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java index e18da248..4d8d679d 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java @@ -21,18 +21,23 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import java.io.InputStream; +import java.net.MalformedURLException; import java.net.URI; +import java.net.URISyntaxException; import java.time.Duration; import java.util.Map; + import org.apache.commons.io.IOUtils; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.utils.URIBuilder; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -44,12 +49,9 @@ import org.slf4j.MDC; * */ public class PublishedChecker { - private static final String FEEDLOG_TOPIC = "feedlog"; - private static final String DEFAULT_FEED_ID = "1"; - private static final Duration WEB_CLIENT_TIMEOUT = Duration.ofSeconds(4); + private static final Duration WEB_CLIENT_TIMEOUT = Duration.ofSeconds(4); private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private final AppConfig appConfig; /** @@ -66,19 +68,24 @@ public class PublishedChecker { * * @param fileName the name of the file used when it is published. * - * @return <code>true</code> if the file has been published before, <code>false</code> otherwise. + * @return <code>true</code> if the file has been published before, <code>false</code> + * otherwise. + * @throws DatafileTaskException if the check fails */ - public boolean isFilePublished(String fileName, Map<String, String> contextMap) { + public boolean isFilePublished(String fileName, String changeIdentifier, Map<String, String> contextMap) + throws DatafileTaskException { MDC.setContextMap(contextMap); - DmaapProducerHttpClient producerClient = resolveClient(); + PublisherConfiguration publisherConfig = resolveConfiguration(changeIdentifier); + + DmaapProducerHttpClient producerClient = resolveClient(publisherConfig); HttpGet getRequest = new HttpGet(); MappedDiagnosticContext.appendTraceInfo(getRequest); - getRequest.setURI(getPublishedQueryUri(fileName, producerClient)); - producerClient.addUserCredentialsToHead(getRequest); - try { + getRequest.setURI(getPublishedQueryUri(fileName, publisherConfig)); + producerClient.addUserCredentialsToHead(getRequest); + HttpResponse response = producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest, WEB_CLIENT_TIMEOUT, contextMap); @@ -95,20 +102,23 @@ public class PublishedChecker { } } - private URI getPublishedQueryUri(String fileName, DmaapProducerHttpClient producerClient) { - return producerClient.getBaseUri() // - .pathSegment(FEEDLOG_TOPIC) // - .pathSegment(DEFAULT_FEED_ID) // - .queryParam("type", "pub") // - .queryParam("filename", fileName) // + private static URI getPublishedQueryUri(String fileName, PublisherConfiguration config) throws URISyntaxException { + return new URIBuilder(config.logUrl()) // + .addParameter("type", "pub") // + .addParameter("filename", fileName) // .build(); } - protected DmaapPublisherConfiguration resolveConfiguration() { - return appConfig.getDmaapPublisherConfiguration(); + protected PublisherConfiguration resolveConfiguration(String changeIdentifier) throws DatafileTaskException { + return appConfig.getPublisherConfiguration(changeIdentifier); } - protected DmaapProducerHttpClient resolveClient() { - return new DmaapProducerHttpClient(resolveConfiguration()); + protected DmaapProducerHttpClient resolveClient(PublisherConfiguration publisherConfig) + throws DatafileTaskException { + try { + return new DmaapProducerHttpClient(publisherConfig.toDmaap()); + } catch (MalformedURLException e) { + throw new DatafileTaskException("Cannot create published checker client", e); + } } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index b5fa0c24..bac52659 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; @@ -86,13 +87,16 @@ public class ScheduledTasks { threadPoolQueueSize.get()); return; } + if (this.applicationConfiguration.getDmaapConsumerConfiguration() == null) { + logger.warn("No configuration loaded, skipping polling for messages"); + return; + } currentNumberOfSubscriptions.incrementAndGet(); Map<String, String> context = MappedDiagnosticContext.initializeTraceContext(); logger.trace("Execution of tasks was registered"); - applicationConfiguration.loadConfigurationFromFile(); createMainTask(context) // - .subscribe(this::onSuccess, // + .subscribe(ScheduledTasks::onSuccess, // throwable -> { onError(throwable, context); currentNumberOfSubscriptions.decrementAndGet(); @@ -115,6 +119,7 @@ public class ScheduledTasks { .flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) // .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) // .flatMap(fileData -> createMdcContext(fileData, context)) // + .filter(this::isFeedConfigured) // .filter(this::shouldBePublished) // .flatMap(this::fetchFile, false, 1, 1) // .flatMap(this::publishToDataRouter, false, 1, 1) // @@ -124,13 +129,13 @@ public class ScheduledTasks { } private class FileDataWithContext { - FileDataWithContext(FileData fileData, Map<String, String> context) { + public final FileData fileData; + public final Map<String, String> context; + + public FileDataWithContext(FileData fileData, Map<String, String> context) { this.fileData = fileData; this.context = context; } - - final FileData fileData; - final Map<String, String> context; } /** @@ -160,7 +165,7 @@ public class ScheduledTasks { return this.threadPoolQueueSize.get(); } - protected DMaaPMessageConsumer createConsumerTask() { + protected DMaaPMessageConsumer createConsumerTask() throws DatafileTaskException { return new DMaaPMessageConsumer(this.applicationConfiguration); } @@ -172,17 +177,17 @@ public class ScheduledTasks { return new DataRouterPublisher(applicationConfiguration); } - private void onComplete(Map<String, String> contextMap) { + private static void onComplete(Map<String, String> contextMap) { MDC.setContextMap(contextMap); logger.trace("Datafile tasks have been completed"); } - private synchronized void onSuccess(FilePublishInformation publishInfo) { + private static synchronized void onSuccess(FilePublishInformation publishInfo) { MDC.setContextMap(publishInfo.getContext()); logger.info("Datafile file published {}", publishInfo.getInternalLocation()); } - private void onError(Throwable throwable, Map<String, String> context) { + private static void onError(Throwable throwable, Map<String, String> context) { MDC.setContextMap(context); logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable.toString()); } @@ -194,11 +199,26 @@ public class ScheduledTasks { return Mono.just(pair); } + private boolean isFeedConfigured(FileDataWithContext fileData) { + if (applicationConfiguration.isFeedConfigured(fileData.fileData.messageMetaData().changeIdentifier())) { + return true; + } else { + logger.info("No feed is configured for: {}, file ignored: {}", + fileData.fileData.messageMetaData().changeIdentifier(), fileData.fileData.name()); + return false; + } + } + private boolean shouldBePublished(FileDataWithContext fileData) { boolean result = false; Path localFilePath = fileData.fileData.getLocalFilePath(); if (publishedFilesCache.put(localFilePath) == null) { - result = !createPublishedChecker().isFilePublished(fileData.fileData.name(), fileData.context); + try { + result = !createPublishedChecker().isFilePublished(fileData.fileData.name(), + fileData.fileData.messageMetaData().changeIdentifier(), fileData.context); + } catch (DatafileTaskException e) { + logger.error("Cannot check if a file {} is published", fileData.fileData.name(), e); + } } if (!result) { currentNumberOfTasks.decrementAndGet(); @@ -248,13 +268,19 @@ public class ScheduledTasks { */ private Flux<FileReadyMessage> fetchMoreFileReadyMessages() { logger.info( - "Consuming new file ready messages, current number of tasks: {}, published files: {}, number of subscrptions: {}", + "Consuming new file ready messages, current number of tasks: {}, published files: {}, " + + "number of subscriptions: {}", getCurrentNumberOfTasks(), publishedFilesCache.size(), this.currentNumberOfSubscriptions.get()); Map<String, String> context = MDC.getCopyOfContextMap(); - return createConsumerTask() // - .getMessageRouterResponse() // - .onErrorResume(exception -> handleConsumeMessageFailure(exception, context)); + try { + return createConsumerTask() // + .getMessageRouterResponse() // + .onErrorResume(exception -> handleConsumeMessageFailure(exception, context)); + } catch (Exception e) { + logger.error("Could not create message consumer task", e); + return Flux.empty(); + } } private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> context) { @@ -264,7 +290,7 @@ public class ScheduledTasks { return Flux.empty(); } - private void deleteFile(Path localFile, Map<String, String> context) { + private static void deleteFile(Path localFile, Map<String, String> context) { MDC.setContextMap(context); logger.trace("Deleting file: {}", localFile); try { diff --git a/datafile-app-server/src/main/resources/datafile_endpoints.json b/datafile-app-server/src/main/resources/datafile_endpoints.json deleted file mode 100644 index 8d45bc84..00000000 --- a/datafile-app-server/src/main/resources/datafile_endpoints.json +++ /dev/null @@ -1,37 +0,0 @@ -{ - "configs": { - "dmaap": { - "dmaapConsumerConfiguration": { - "dmaapHostName": "localhost", - "dmaapPortNumber": 2222, - "dmaapTopicName": "/events/unauthenticated.VES_NOTIFICATION_OUTPUT", - "dmaapProtocol": "http", - "dmaapUserName": "", - "dmaapUserPassword": "", - "dmaapContentType": "application/json", - "consumerId": "C12", - "consumerGroup": "OpenDcae-c12", - "timeoutMs": -1, - "messageLimit": 1 - }, - "dmaapProducerConfiguration": { - "dmaapHostName": "localhost", - "dmaapPortNumber": 3907, - "dmaapTopicName": "publish", - "dmaapProtocol": "https", - "dmaapUserName": "dradmin", - "dmaapUserPassword": "dradmin", - "dmaapContentType": "application/octet-stream" - } - }, - "ftp": { - "ftpesConfiguration": { - "keyCert": "config/dfc.jks", - "keyPassword": "secret", - "trustedCa": "config/ftp.jks", - "trustedCaPassword": "secret" - } - } - } -} - diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java index 5be75ab3..b1148a6a 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java @@ -16,25 +16,51 @@ package org.onap.dcaegen2.collectors.datafile.configuration; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; + +import com.google.common.base.Charsets; +import com.google.common.io.Resources; import com.google.gson.JsonElement; +import com.google.gson.JsonIOException; import com.google.gson.JsonObject; import com.google.gson.JsonParser; +import com.google.gson.JsonSyntaxException; + import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URL; import java.nio.charset.StandardCharsets; -import java.util.Objects; +import java.util.Map; +import java.util.Properties; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; +import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.CloudConfigurationProvider; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.read.ListAppender; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; /** * Tests the AppConfig. @@ -44,167 +70,285 @@ import org.junit.jupiter.api.Test; */ class AppConfigTest { - private static final String DATAFILE_ENDPOINTS = "datafile_endpoints.json"; - private static final boolean CORRECT_JSON = true; - private static final boolean INCORRECT_JSON = false; + private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES"; + + + private static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = // + new ImmutableDmaapConsumerConfiguration.Builder() // + .timeoutMs(-1) // + .dmaapHostName("message-router.onap.svc.cluster.local") // + .dmaapUserName("admin") // + .dmaapUserPassword("admin") // + .dmaapTopicName("events/unauthenticated.VES_NOTIFICATION_OUTPUT") // + .dmaapPortNumber(2222) // + .dmaapContentType("application/json") // + .messageLimit(-1) // + .dmaapProtocol("http") // + .consumerId("C12") // + .consumerGroup("OpenDcae-c12") // + .trustStorePath("trustStorePath") // + .trustStorePasswordPath("trustStorePasswordPath") // + .keyStorePath("keyStorePath") // + .keyStorePasswordPath("keyStorePasswordPath") // + .enableDmaapCertAuth(true) // + .build(); + + private static final ConsumerConfiguration CORRECT_CONSUMER_CONFIG = ImmutableConsumerConfiguration.builder() // + .topicUrl( + "http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12") + .trustStorePath("trustStorePath") // + .trustStorePasswordPath("trustStorePasswordPath") // + .keyStorePath("keyStorePath") // + .keyStorePasswordPath("keyStorePasswordPath") // + .enableDmaapCertAuth(true) // + .build(); + + private static final PublisherConfiguration CORRECT_PUBLISHER_CONFIG = // + ImmutablePublisherConfiguration.builder() // + .publishUrl("https://message-router.onap.svc.cluster.local:3907/publish/1") // + .logUrl("https://dmaap.example.com/feedlog/972").trustStorePath("trustStorePath") // + .trustStorePasswordPath("trustStorePasswordPath") // + .keyStorePath("keyStorePath") // + .keyStorePasswordPath("keyStorePasswordPath") // + .enableDmaapCertAuth(true) // + .changeIdentifier("PM_MEAS_FILES") // + .userName("user") // + .passWord("password") // + .build(); + + private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION = // + new ImmutableFtpesConfig.Builder() // + .keyCert("/config/dfc.jks") // + .keyPassword("secret") // + .trustedCa("config/ftp.jks") // + .trustedCaPassword("secret") // + .build(); + + private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = // + new ImmutableDmaapPublisherConfiguration.Builder() // + .dmaapTopicName("/publish/1") // + .dmaapUserPassword("password") // + .dmaapPortNumber(3907) // + .dmaapProtocol("https") // + .dmaapContentType("application/octet-stream") // + .dmaapHostName("message-router.onap.svc.cluster.local") // + .dmaapUserName("user") // + .trustStorePath("trustStorePath") // + .trustStorePasswordPath("trustStorePasswordPath") // + .keyStorePath("keyStorePath") // + .keyStorePasswordPath("keyStorePasswordPath") // + .enableDmaapCertAuth(true) // + .build(); + + private static EnvProperties properties() { + return ImmutableEnvProperties.builder() // + .consulHost("host") // + .consulPort(123) // + .cbsName("cbsName") // + .appName("appName") // + .build(); + } - private static AppConfig appConfigUnderTest; + private AppConfig appConfigUnderTest; + private CloudConfigurationProvider cloudConfigurationProvider = mock(CloudConfigurationProvider.class); + private final Map<String, String> context = MappedDiagnosticContext.initializeTraceContext(); - private static String filePath = - Objects.requireNonNull(AppConfigTest.class.getClassLoader().getResource(DATAFILE_ENDPOINTS)).getFile(); @BeforeEach public void setUp() { appConfigUnderTest = spy(AppConfig.class); + appConfigUnderTest.setCloudConfigurationProvider(cloudConfigurationProvider); + appConfigUnderTest.systemEnvironment = new Properties(); } @Test - public void whenApplicationWasStarted_FilePathIsSet() { + public void whenTheConfigurationFits() throws IOException, DatafileTaskException { // When - appConfigUnderTest.setFilepath(filePath); + doReturn(getCorrectJson()).when(appConfigUnderTest).createInputStream(any()); + appConfigUnderTest.initialize(); // Then - verify(appConfigUnderTest, times(1)).setFilepath(anyString()); - verify(appConfigUnderTest, times(0)).loadConfigurationFromFile(); - Assertions.assertEquals(filePath, appConfigUnderTest.getFilepath()); + verify(appConfigUnderTest, times(1)).loadConfigurationFromFile(); + + ConsumerConfiguration consumerCfg = appConfigUnderTest.getDmaapConsumerConfiguration(); + Assertions.assertNotNull(consumerCfg); + assertThat(consumerCfg.toDmaap()).isEqualToComparingFieldByField(CORRECT_DMAAP_CONSUMER_CONFIG); + assertThat(consumerCfg).isEqualToComparingFieldByField(CORRECT_CONSUMER_CONFIG); + + PublisherConfiguration publisherCfg = appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER); + Assertions.assertNotNull(publisherCfg); + assertThat(publisherCfg).isEqualToComparingFieldByField(CORRECT_PUBLISHER_CONFIG); + assertThat(publisherCfg.toDmaap()).isEqualToComparingFieldByField(CORRECT_DMAAP_PUBLISHER_CONFIG); + + FtpesConfig ftpesConfig = appConfigUnderTest.getFtpesConfiguration(); + assertThat(ftpesConfig).isNotNull(); + assertThat(ftpesConfig).isEqualToComparingFieldByField(CORRECT_FTPES_CONFIGURATION); } @Test - public void whenTheConfigurationFits_GetFtpsAndDmaapObjectRepresentationConfiguration() throws IOException { - // Given - InputStream inputStream = - new ByteArrayInputStream((getJsonConfig(CORRECT_JSON).getBytes(StandardCharsets.UTF_8))); - + public void whenTheConfigurationFits_twoProducers() throws IOException, DatafileTaskException { // When - appConfigUnderTest.setFilepath(filePath); - doReturn(inputStream).when(appConfigUnderTest).createInputStream(any()); + doReturn(getCorrectJsonTwoProducers()).when(appConfigUnderTest).createInputStream(any()); appConfigUnderTest.loadConfigurationFromFile(); // Then - verify(appConfigUnderTest, times(1)).setFilepath(anyString()); verify(appConfigUnderTest, times(1)).loadConfigurationFromFile(); Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration()); - Assertions.assertNotNull(appConfigUnderTest.getDmaapPublisherConfiguration()); - Assertions.assertEquals(appConfigUnderTest.getDmaapPublisherConfiguration(), - appConfigUnderTest.getDmaapPublisherConfiguration()); - Assertions.assertEquals(appConfigUnderTest.getDmaapConsumerConfiguration(), - appConfigUnderTest.getDmaapConsumerConfiguration()); - Assertions.assertEquals(appConfigUnderTest.getFtpesConfiguration(), appConfigUnderTest.getFtpesConfiguration()); + Assertions.assertNotNull(appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER)); + Assertions.assertNotNull(appConfigUnderTest.getPublisherConfiguration("XX_FILES")); + Assertions.assertNotNull(appConfigUnderTest.getPublisherConfiguration("YY_FILES")); + + assertThat(appConfigUnderTest.getPublisherConfiguration("XX_FILES").publishUrl()) + .isEqualTo("feed01::publish_url"); + assertThat(appConfigUnderTest.getPublisherConfiguration("YY_FILES").publishUrl()) + .isEqualTo("feed01::publish_url"); } @Test - public void whenFileIsNotExist_ThrowIoException() { + public void whenFileIsNotExist_ThrowException() throws DatafileTaskException { // Given - filePath = "/temp.json"; - appConfigUnderTest.setFilepath(filePath); + appConfigUnderTest.setFilepath("/temp.json"); // When appConfigUnderTest.loadConfigurationFromFile(); // Then - verify(appConfigUnderTest, times(1)).setFilepath(anyString()); - verify(appConfigUnderTest, times(1)).loadConfigurationFromFile(); Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration()); - Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration()); - Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration()); + assertThatThrownBy(() -> appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER)) + .hasMessageContaining("No PublishingConfiguration loaded, changeIdentifier: PM_MEAS_FILES"); + Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration()); } @Test - public void whenFileIsExistsButJsonIsIncorrect() throws IOException { - // Given - InputStream inputStream = - new ByteArrayInputStream((getJsonConfig(INCORRECT_JSON).getBytes(StandardCharsets.UTF_8))); + public void whenFileIsExistsButJsonIsIncorrect() throws IOException, DatafileTaskException { // When - appConfigUnderTest.setFilepath(filePath); - doReturn(inputStream).when(appConfigUnderTest).createInputStream(any()); + doReturn(getIncorrectJson()).when(appConfigUnderTest).createInputStream(any()); appConfigUnderTest.loadConfigurationFromFile(); // Then - verify(appConfigUnderTest, times(1)).setFilepath(anyString()); verify(appConfigUnderTest, times(1)).loadConfigurationFromFile(); Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration()); - Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration()); + assertThatThrownBy(() -> appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER)) + .hasMessageContaining(CHANGE_IDENTIFIER); Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration()); - } - @Test - public void whenTheConfigurationFits_ButRootElementIsNotAJsonObject() throws IOException { - // Given - InputStream inputStream = - new ByteArrayInputStream((getJsonConfig(CORRECT_JSON).getBytes(StandardCharsets.UTF_8))); + public void whenTheConfigurationFits_ButRootElementIsNotAJsonObject() throws IOException, DatafileTaskException { + // When - appConfigUnderTest.setFilepath(filePath); - doReturn(inputStream).when(appConfigUnderTest).createInputStream(any()); + doReturn(getCorrectJson()).when(appConfigUnderTest).createInputStream(any()); JsonElement jsonElement = mock(JsonElement.class); when(jsonElement.isJsonObject()).thenReturn(false); doReturn(jsonElement).when(appConfigUnderTest).getJsonElement(any(JsonParser.class), any(InputStream.class)); appConfigUnderTest.loadConfigurationFromFile(); // Then - verify(appConfigUnderTest, times(1)).setFilepath(anyString()); verify(appConfigUnderTest, times(1)).loadConfigurationFromFile(); Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration()); - Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration()); + assertThatThrownBy(() -> appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER)) + .hasMessageContaining(CHANGE_IDENTIFIER); Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration()); } - private String getJsonConfig(boolean correct) { - JsonObject dmaapConsumerConfigData = new JsonObject(); - dmaapConsumerConfigData.addProperty("dmaapHostName", "localhost"); - dmaapConsumerConfigData.addProperty("dmaapPortNumber", 2222); - dmaapConsumerConfigData.addProperty("dmaapTopicName", "/events/unauthenticated.VES_NOTIFICATION_OUTPUT"); - dmaapConsumerConfigData.addProperty("dmaapProtocol", "http"); - dmaapConsumerConfigData.addProperty("dmaapUserName", "admin"); - dmaapConsumerConfigData.addProperty("dmaapUserPassword", "admin"); - dmaapConsumerConfigData.addProperty("dmaapContentType", "application/json"); - dmaapConsumerConfigData.addProperty("consumerId", "C12"); - dmaapConsumerConfigData.addProperty("consumerGroup", "OpenDcae-c12"); - dmaapConsumerConfigData.addProperty("timeoutMs", -1); - dmaapConsumerConfigData.addProperty("messageLimit", 1); - - JsonObject dmaapProducerConfigData = new JsonObject(); - dmaapProducerConfigData.addProperty("dmaapHostName", "localhost"); - dmaapProducerConfigData.addProperty("dmaapPortNumber", 3907); - dmaapProducerConfigData.addProperty("dmaapTopicName", "publish"); - dmaapProducerConfigData.addProperty("dmaapProtocol", "https"); - if (correct) { - dmaapProducerConfigData.addProperty("dmaapUserName", "dradmin"); - dmaapProducerConfigData.addProperty("dmaapUserPassword", "dradmin"); - dmaapProducerConfigData.addProperty("dmaapContentType", "application/octet-stream"); - } - - JsonObject dmaapConfigs = new JsonObject(); - dmaapConfigs.add("dmaapConsumerConfiguration", dmaapConsumerConfigData); - dmaapConfigs.add("dmaapProducerConfiguration", dmaapProducerConfigData); - - JsonObject ftpesConfigData = new JsonObject(); - ftpesConfigData.addProperty("keyCert", "config/dfc.jks"); - ftpesConfigData.addProperty("keyPassword", "secret"); - ftpesConfigData.addProperty("trustedCa", "config/ftp.jks"); - ftpesConfigData.addProperty("trustedCaPassword", "secret"); - - JsonObject security = new JsonObject(); - security.addProperty("trustStorePath", "trustStorePath"); - security.addProperty("trustStorePasswordPath", "trustStorePasswordPath"); - security.addProperty("keyStorePath", "keyStorePath"); - security.addProperty("keyStorePasswordPath", "keyStorePasswordPath"); - security.addProperty("enableDmaapCertAuth", "enableDmaapCertAuth"); - - JsonObject ftpesConfiguration = new JsonObject(); - ftpesConfiguration.add("ftpesConfiguration", ftpesConfigData); - - JsonObject configs = new JsonObject(); - configs.add("dmaap", dmaapConfigs); - configs.add("ftp", ftpesConfiguration); - configs.add("security", security); - - JsonObject completeJson = new JsonObject(); - completeJson.add("configs", configs); - - return completeJson.toString(); + @Test + public void whenPeriodicConfigRefreshNoEnvironmentVariables() { + ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(AppConfig.class); + + Flux<AppConfig> task = appConfigUnderTest.createRefreshConfigurationTask(1L, context); + + StepVerifier // + .create(task) // + .expectSubscription() // + .expectNextCount(0) // + .verifyComplete(); + + assertTrue(logAppender.list.toString().contains("$CONSUL_HOST environment has not been defined")); + } + + @Test + public void whenPeriodicConfigRefreshNoConsul() { + ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(AppConfig.class); + + doReturn(Mono.just(properties())).when(appConfigUnderTest).readEnvironmentVariables(any(), any()); + Mono<JsonObject> err = Mono.error(new IOException()); + doReturn(err).when(cloudConfigurationProvider).callForServiceConfigurationReactive(any()); + + Flux<AppConfig> task = appConfigUnderTest.createRefreshConfigurationTask(1L, context); + + StepVerifier // + .create(task) // + .expectSubscription() // + .expectNextCount(0) // + .verifyComplete(); + + assertTrue(logAppender.list.toString() + .contains("Could not refresh application configuration java.io.IOException")); + } + + @Test + public void whenPeriodicConfigRefreshSuccess() throws JsonIOException, JsonSyntaxException, IOException { + doReturn(Mono.just(properties())).when(appConfigUnderTest).readEnvironmentVariables(any(), any()); + + Mono<JsonObject> json = Mono.just(getJsonRootObject()); + + doReturn(json, json).when(cloudConfigurationProvider).callForServiceConfigurationReactive(any()); + + Flux<AppConfig> task = appConfigUnderTest.createRefreshConfigurationTask(1L, context); + + StepVerifier // + .create(task) // + .expectSubscription() // + .expectNext(appConfigUnderTest) // + .verifyComplete(); + + Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration()); + } + + @Test + public void whenPeriodicConfigRefreshSuccess2() throws JsonIOException, JsonSyntaxException, IOException { + doReturn(Mono.just(properties())).when(appConfigUnderTest).readEnvironmentVariables(any(), any()); + + Mono<JsonObject> json = Mono.just(getJsonRootObject()); + Mono<JsonObject> err = Mono.error(new IOException()); // no config entry created by the + // dmaap plugin + + doReturn(json, err).when(cloudConfigurationProvider).callForServiceConfigurationReactive(any()); + + Flux<AppConfig> task = appConfigUnderTest.createRefreshConfigurationTask(1L, context); + + StepVerifier // + .create(task) // + .expectSubscription() // + .expectNext(appConfigUnderTest) // + .verifyComplete(); + + Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration()); + } + + private JsonObject getJsonRootObject() throws JsonIOException, JsonSyntaxException, IOException { + JsonObject rootObject = (new JsonParser()).parse(new InputStreamReader(getCorrectJson())).getAsJsonObject(); + return rootObject; + } + + private static InputStream getCorrectJson() throws IOException { + URL url = CloudConfigParser.class.getClassLoader().getResource("datafile_endpoints_test.json"); + String string = Resources.toString(url, Charsets.UTF_8); + return new ByteArrayInputStream((string.getBytes(StandardCharsets.UTF_8))); + } + + private static InputStream getCorrectJsonTwoProducers() throws IOException { + URL url = CloudConfigParser.class.getClassLoader().getResource("datafile_endpoints_test_2producers.json"); + String string = Resources.toString(url, Charsets.UTF_8); + return new ByteArrayInputStream((string.getBytes(StandardCharsets.UTF_8))); + } + + private static InputStream getIncorrectJson() { + String string = "{" + // + " \"configs\": {" + // + " \"dmaap\": {"; // + return new ByteArrayInputStream((string.getBytes(StandardCharsets.UTF_8))); } } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java deleted file mode 100644 index 07233d95..00000000 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java +++ /dev/null @@ -1,133 +0,0 @@ -/*- - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.configuration; - -import static org.assertj.core.api.Assertions.assertThat; -import com.google.gson.JsonObject; -import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; - - -class CloudConfigParserTest { - private static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = // - new ImmutableDmaapConsumerConfiguration.Builder() // - .timeoutMs(-1) // - .dmaapHostName("message-router.onap.svc.cluster.local") // - .dmaapUserName("admin") // - .dmaapUserPassword("admin") // - .dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT") // - .dmaapPortNumber(2222) // - .dmaapContentType("application/json") // - .messageLimit(-1) // - .dmaapProtocol("http") // - .consumerId("C12") // - .consumerGroup("OpenDCAE-c12") // - .trustStorePath("trustStorePath") // - .trustStorePasswordPath("trustStorePasswordPath") // - .keyStorePath("keyStorePath") // - .keyStorePasswordPath("keyStorePasswordPath") // - .enableDmaapCertAuth(true) // - .build(); - - private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = // - new ImmutableDmaapPublisherConfiguration.Builder() // - .dmaapTopicName("publish") // - .dmaapUserPassword("dradmin") // - .dmaapPortNumber(3907) // - .dmaapProtocol("https") // - .dmaapContentType("application/json") // - .dmaapHostName("message-router.onap.svc.cluster.local") // - .dmaapUserName("dradmin") // - .trustStorePath("trustStorePath") // - .trustStorePasswordPath("trustStorePasswordPath") // - .keyStorePath("keyStorePath") // - .keyStorePasswordPath("keyStorePasswordPath") // - .enableDmaapCertAuth(true) // - .build(); - - private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION = // - new ImmutableFtpesConfig.Builder() // - .keyCert("/config/dfc.jks") // - .keyPassword("secret") // - .trustedCa("config/ftp.jks") // - .trustedCaPassword("secret") // - .build(); - - private CloudConfigParser cloudConfigParser = new CloudConfigParser(getCloudConfigJsonObject()); - - @Test - public void shouldCreateDmaapConsumerConfigurationCorrectly() { - DmaapConsumerConfiguration dmaapConsumerConfig = cloudConfigParser.getDmaapConsumerConfig(); - - assertThat(dmaapConsumerConfig).isNotNull(); - assertThat(dmaapConsumerConfig).isEqualToComparingFieldByField(CORRECT_DMAAP_CONSUMER_CONFIG); - } - - @Test - public void shouldCreateDmaapPublisherConfigurationCorrectly() { - DmaapPublisherConfiguration dmaapPublisherConfig = cloudConfigParser.getDmaapPublisherConfig(); - - assertThat(dmaapPublisherConfig).isNotNull(); - assertThat(dmaapPublisherConfig).isEqualToComparingFieldByField(CORRECT_DMAAP_PUBLISHER_CONFIG); - } - - @Test - public void shouldCreateFtpesConfigurationCorrectly() { - FtpesConfig ftpesConfig = cloudConfigParser.getFtpesConfig(); - - assertThat(ftpesConfig).isNotNull(); - assertThat(ftpesConfig).isEqualToComparingFieldByField(CORRECT_FTPES_CONFIGURATION); - } - - public JsonObject getCloudConfigJsonObject() { - JsonObject config = new JsonObject(); - config.addProperty("dmaap.dmaapConsumerConfiguration.timeoutMs", -1); - config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapHostName", "message-router.onap.svc.cluster.local"); - config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapUserName", "admin"); - config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapUserPassword", "admin"); - config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapTopicName", - "/events/unauthenticated.VES_NOTIFICATION_OUTPUT"); - config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapPortNumber", 2222); - config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapContentType", "application/json"); - config.addProperty("dmaap.dmaapConsumerConfiguration.messageLimit", -1); - config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapProtocol", "http"); - config.addProperty("dmaap.dmaapConsumerConfiguration.consumerId", "C12"); - config.addProperty("dmaap.dmaapConsumerConfiguration.consumerGroup", "OpenDCAE-c12"); - config.addProperty("dmaap.dmaapProducerConfiguration.dmaapTopicName", "publish"); - config.addProperty("dmaap.dmaapProducerConfiguration.dmaapProtocol", "https"); - config.addProperty("dmaap.dmaapProducerConfiguration.dmaapContentType", "application/json"); - config.addProperty("dmaap.dmaapProducerConfiguration.dmaapHostName", "message-router.onap.svc.cluster.local"); - config.addProperty("dmaap.dmaapProducerConfiguration.dmaapPortNumber", 3907); - config.addProperty("dmaap.dmaapProducerConfiguration.dmaapUserName", "dradmin"); - config.addProperty("dmaap.dmaapProducerConfiguration.dmaapUserPassword", "dradmin"); - config.addProperty("dmaap.ftpesConfig.keyCert", "/config/dfc.jks"); - config.addProperty("dmaap.ftpesConfig.keyPassword", "secret"); - config.addProperty("dmaap.ftpesConfig.trustedCa", "config/ftp.jks"); - config.addProperty("dmaap.ftpesConfig.trustedCaPassword", "secret"); - - config.addProperty("dmaap.security.trustStorePath", "trustStorePath"); - config.addProperty("dmaap.security.trustStorePasswordPath", "trustStorePasswordPath"); - config.addProperty("dmaap.security.keyStorePath", "keyStorePath"); - config.addProperty("dmaap.security.keyStorePasswordPath", "keyStorePasswordPath"); - config.addProperty("dmaap.security.enableDmaapCertAuth", "true"); - - return config; - } -} diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfigTest.java index 6e2140b4..eba88c33 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfigTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfigTest.java @@ -25,6 +25,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -35,26 +36,40 @@ import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ScheduledFuture; + +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.scheduling.TaskScheduler; + import reactor.test.StepVerifier; public class SchedulerConfigTest { + private final AppConfig appConfigurationMock = mock(AppConfig.class); + private final TaskScheduler taskSchedulerMock = mock(TaskScheduler.class); + private final ScheduledTasks scheduledTasksMock = mock(ScheduledTasks.class); + private final SchedulerConfig schedulerUnderTest = + spy(new SchedulerConfig(taskSchedulerMock, scheduledTasksMock, appConfigurationMock)); + + @BeforeEach + public void setUp() { + doNothing().when(appConfigurationMock).stop(); + doNothing().when(appConfigurationMock).initialize(); + } + @Test public void getResponseFromCancellationOfTasks_success() { + List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>(); ScheduledFuture<?> scheduledFutureMock = mock(ScheduledFuture.class); scheduledFutureList.add(scheduledFutureMock); SchedulerConfig.setScheduledFutureList(scheduledFutureList); - SchedulerConfig schedulerUnderTest = new SchedulerConfig(null, null, null); - String msg = "Datafile Service has already been stopped!"; StepVerifier.create(schedulerUnderTest.getResponseFromCancellationOfTasks()) .expectNext(new ResponseEntity<String>(msg, HttpStatus.CREATED)) // @@ -68,24 +83,17 @@ public class SchedulerConfigTest { @Test public void tryToStartTaskWhenNotStarted_success() { - TaskScheduler taskSchedulerMock = mock(TaskScheduler.class); - ScheduledTasks scheduledTasksMock = mock(ScheduledTasks.class); - CloudConfiguration cloudConfigurationMock = mock(CloudConfiguration.class); List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>(); SchedulerConfig.setScheduledFutureList(scheduledFutureList); SchedulerConfig schedulerUnderTestSpy = - spy(new SchedulerConfig(taskSchedulerMock, scheduledTasksMock, cloudConfigurationMock)); + spy(new SchedulerConfig(taskSchedulerMock, scheduledTasksMock, appConfigurationMock)); boolean actualResult = schedulerUnderTestSpy.tryToStartTask(); assertTrue(actualResult); - ArgumentCaptor<Runnable> runTaskRunnableCaptor = ArgumentCaptor.forClass(Runnable.class); - verify(taskSchedulerMock).scheduleAtFixedRate(runTaskRunnableCaptor.capture(), any(Instant.class), - eq(Duration.ofMinutes(5))); - ArgumentCaptor<Runnable> scheduleMainDatafileEventTaskCaptor = ArgumentCaptor.forClass(Runnable.class); verify(taskSchedulerMock).scheduleWithFixedDelay(scheduleMainDatafileEventTaskCaptor.capture(), eq(Duration.ofSeconds(15))); @@ -100,22 +108,22 @@ public class SchedulerConfigTest { verify(scheduledTasksMock).executeDatafileMainTask(); verifyNoMoreInteractions(scheduledTasksMock); - runTaskRunnableCaptor.getValue().run(); - verify(cloudConfigurationMock).runTask(); - verifyNoMoreInteractions(cloudConfigurationMock); + verify(appConfigurationMock).initialize(); + verifyNoMoreInteractions(appConfigurationMock); - assertEquals(3, scheduledFutureList.size()); + assertEquals(2, scheduledFutureList.size()); } @Test public void tryToStartTaskWhenAlreadyStarted_shouldReturnFalse() { + doNothing().when(appConfigurationMock).loadConfigurationFromFile(); List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>(); ScheduledFuture<?> scheduledFutureMock = mock(ScheduledFuture.class); scheduledFutureList.add(scheduledFutureMock); SchedulerConfig.setScheduledFutureList(scheduledFutureList); - SchedulerConfig schedulerUnderTest = new SchedulerConfig(null, null, null); + SchedulerConfig schedulerUnderTest = new SchedulerConfig(null, null, appConfigurationMock); boolean actualResult = schedulerUnderTest.tryToStartTask(); diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java deleted file mode 100644 index 7f6b8c51..00000000 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java +++ /dev/null @@ -1,63 +0,0 @@ -/*- - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.integration; - -import static org.mockito.Mockito.atLeast; -import static org.mockito.Mockito.verify; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.ComponentScan; -import org.springframework.context.annotation.Configuration; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit.jupiter.SpringExtension; -import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; - -/** - * Integration test for the ScheduledXmlContext. - * - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/27/18 - * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> - */ - -@Configuration -@ComponentScan -@ExtendWith({ SpringExtension.class }) -@ContextConfiguration(locations = { "classpath:scheduled-context.xml" }) -class ScheduledXmlContextITest extends AbstractTestNGSpringContextTests { - - private static final int WAIT_FOR_SCHEDULING = 1; - - @Autowired - private ScheduledTasks scheduledTask; - - @Test - void testScheduling() { - final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); - executorService.scheduleWithFixedDelay(this::verifyDmaapConsumerTask, 0, WAIT_FOR_SCHEDULING, TimeUnit.SECONDS); - } - - private void verifyDmaapConsumerTask() { - verify(scheduledTask, atLeast(1)).executeDatafileMainTask(); - } -} diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java deleted file mode 100644 index 83c92ef4..00000000 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java +++ /dev/null @@ -1,72 +0,0 @@ -/*- - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.model; - -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.HashMap; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -public class FilePublishInformationTest { - private static final String PRODUCT_NAME = "NrRadio"; - private static final String VENDOR_NAME = "Ericsson"; - private static final String LAST_EPOCH_MICROSEC = "8745745764578"; - private static final String SOURCE_NAME = "oteNB5309"; - private static final String START_EPOCH_MICROSEC = "8745745764578"; - private static final String TIME_ZONE_OFFSET = "UTC+05:00"; - private static final String NAME = "A20161224.1030-1045.bin.gz"; - private static final String LOCATION = "ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1145.bin.gz"; - private static final Path INTERNAL_LOCATION = Paths.get("target/A20161224.1030-1045.bin.gz"); - private static final String COMPRESSION = "gzip"; - private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec"; - private static final String FILE_FORMAT_VERSION = "V10"; - - @Test - public void filePublishInformationBuilder_shouldBuildAnObject() { - FilePublishInformation filePublishInformation = ImmutableFilePublishInformation.builder() // - .productName(PRODUCT_NAME) // - .vendorName(VENDOR_NAME) // - .lastEpochMicrosec(LAST_EPOCH_MICROSEC) // - .sourceName(SOURCE_NAME) // - .startEpochMicrosec(START_EPOCH_MICROSEC) // - .timeZoneOffset(TIME_ZONE_OFFSET) // - .name(NAME) // - .location(LOCATION) // - .internalLocation(INTERNAL_LOCATION) // - .compression(COMPRESSION) // - .fileFormatType(FILE_FORMAT_TYPE) // - .fileFormatVersion(FILE_FORMAT_VERSION) // - .context(new HashMap<String,String>()) // - .build(); - - Assertions.assertNotNull(filePublishInformation); - Assertions.assertEquals(PRODUCT_NAME, filePublishInformation.getProductName()); - Assertions.assertEquals(VENDOR_NAME, filePublishInformation.getVendorName()); - Assertions.assertEquals(LAST_EPOCH_MICROSEC, filePublishInformation.getLastEpochMicrosec()); - Assertions.assertEquals(SOURCE_NAME, filePublishInformation.getSourceName()); - Assertions.assertEquals(START_EPOCH_MICROSEC, filePublishInformation.getStartEpochMicrosec()); - Assertions.assertEquals(TIME_ZONE_OFFSET, filePublishInformation.getTimeZoneOffset()); - Assertions.assertEquals(NAME, filePublishInformation.getName()); - Assertions.assertEquals(LOCATION, filePublishInformation.getLocation()); - Assertions.assertEquals(INTERNAL_LOCATION, filePublishInformation.getInternalLocation()); - Assertions.assertEquals(COMPRESSION, filePublishInformation.getCompression()); - Assertions.assertEquals(FILE_FORMAT_TYPE, filePublishInformation.getFileFormatType()); - Assertions.assertEquals(FILE_FORMAT_VERSION, filePublishInformation.getFileFormatVersion()); - } -} diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java index becfba31..8c7938bf 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java @@ -22,9 +22,12 @@ import static org.mockito.Mockito.spy; import com.google.gson.JsonElement; import com.google.gson.JsonParser; + +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.Optional; + import org.junit.jupiter.api.Test; import org.mockito.Mockito; import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; @@ -36,6 +39,7 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData; import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField; + import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -65,7 +69,7 @@ class JsonMessageParserTest { private static final String NOTIFICATION_FIELDS_VERSION = "1.0"; @Test - void whenPassingCorrectJson_oneFileReadyMessage() { + void whenPassingCorrectJson_oneFileReadyMessage() throws URISyntaxException { AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() // .name(PM_FILE_NAME) // .location(LOCATION) // diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java index e21bbd7b..a71521cd 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java @@ -26,7 +26,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import java.net.URI; + import java.nio.charset.StandardCharsets; import java.security.KeyManagementException; import java.security.KeyStoreException; @@ -35,7 +35,9 @@ import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Future; + import javax.net.ssl.SSLContext; + import org.apache.commons.codec.binary.Base64; import org.apache.http.Header; import org.apache.http.HttpResponse; @@ -192,10 +194,4 @@ class DmaapProducerHttpClientTest { Header[] authorizationHeaders = request.getHeaders("Authorization"); assertEquals(base64Creds, authorizationHeaders[0].getValue()); } - - @Test - public void getBaseUri_success() { - URI uri = producerClientUnderTestSpy.getBaseUri().build(); - assertEquals(HTTPS_SCHEME + "://" + HOST + ":" + PORT, uri.toString()); - } } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java index 5e737253..574ad18e 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java @@ -77,6 +77,7 @@ public class DMaaPMessageConsumerTest { private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec"; private static final String FILE_FORMAT_VERSION = "V10"; private static List<FilePublishInformation> listOfFilePublishInformation = new ArrayList<FilePublishInformation>(); + private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES"; private DMaaPConsumerReactiveHttpClient httpClientMock; @@ -173,6 +174,7 @@ public class DMaaPMessageConsumerTest { .compression(GZIP_COMPRESSION) // .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) // .fileFormatVersion(FILE_FORMAT_VERSION) // + .changeIdentifier(CHANGE_IDENTIFIER) // .context(new HashMap<String,String>()) // .build(); listOfFilePublishInformation.add(filePublishInformation); diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java index 8f768d38..463c62c9 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java @@ -47,14 +47,12 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation; import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.springframework.http.HttpStatus; -import org.springframework.web.util.DefaultUriBuilderFactory; -import org.springframework.web.util.UriBuilder; import reactor.test.StepVerifier; @@ -65,6 +63,7 @@ import reactor.test.StepVerifier; * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ class DataRouterPublisherTest { + private static final String PRODUCT_NAME = "NrRadio"; private static final String VENDOR_NAME = "Ericsson"; private static final String LAST_EPOCH_MICROSEC = "8745745764578"; @@ -73,6 +72,7 @@ class DataRouterPublisherTest { private static final String TIME_ZONE_OFFSET = "UTC+05:00"; private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; private static final String FTPES_ADDRESS = "ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME; + private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES"; private static final String COMPRESSION = "gzip"; private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec"; @@ -90,15 +90,17 @@ class DataRouterPublisherTest { private static FilePublishInformation filePublishInformation; private static DmaapProducerHttpClient httpClientMock; private static AppConfig appConfig; - private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class); + private static PublisherConfiguration publisherConfigurationMock = mock(PublisherConfiguration.class); private static Map<String, String> context = new HashMap<>(); private static DataRouterPublisher publisherTaskUnderTestSpy; + // "https://54.45.333.2:1234/publish/1"; + private static final String PUBLISH_URL = + HTTPS_SCHEME + "://" + HOST + ":" + PORT + "/" + PUBLISH_TOPIC + "/" + FEED_ID; + @BeforeAll public static void setUp() { - when(publisherConfigurationMock.dmaapHostName()).thenReturn(HOST); - when(publisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME); - when(publisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT); + when(publisherConfigurationMock.publishUrl()).thenReturn(PUBLISH_URL); filePublishInformation = ImmutableFilePublishInformation.builder() // .productName(PRODUCT_NAME) // @@ -114,6 +116,7 @@ class DataRouterPublisherTest { .fileFormatType(FILE_FORMAT_TYPE) // .fileFormatVersion(FILE_FORMAT_VERSION) // .context(context) // + .changeIdentifier(CHANGE_IDENTIFIER) // .build(); // appConfig = mock(AppConfig.class); publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig)); @@ -128,7 +131,6 @@ class DataRouterPublisherTest { .verifyComplete(); ArgumentCaptor<HttpUriRequest> requestCaptor = ArgumentCaptor.forClass(HttpUriRequest.class); - verify(httpClientMock).getBaseUri(); verify(httpClientMock).addUserCredentialsToHead(any(HttpUriRequest.class)); verify(httpClientMock).getDmaapProducerResponseWithRedirect(requestCaptor.capture(), any()); verifyNoMoreInteractions(httpClientMock); @@ -138,6 +140,7 @@ class DataRouterPublisherTest { assertEquals(HTTPS_SCHEME, actualUri.getScheme()); assertEquals(HOST, actualUri.getHost()); assertEquals(PORT, actualUri.getPort()); + Path actualPath = Paths.get(actualUri.getPath()); assertTrue(PUBLISH_TOPIC.equals(actualPath.getName(0).toString())); assertTrue(FEED_ID.equals(actualPath.getName(1).toString())); @@ -160,7 +163,8 @@ class DataRouterPublisherTest { assertEquals(FILE_FORMAT_TYPE, metaHash.get("fileFormatType")); assertEquals(FILE_FORMAT_VERSION, metaHash.get("fileFormatVersion")); - // Note that the following line checks the number of properties that are sent to the data router. + // Note that the following line checks the number of properties that are sent to the data + // router. // This should be 10 unless the API is updated (which is the fields checked above) assertEquals(10, metaHash.size()); } @@ -185,7 +189,6 @@ class DataRouterPublisherTest { .expectNext(filePublishInformation) // .verifyComplete(); - verify(httpClientMock, times(2)).getBaseUri(); verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class)); verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any()); verifyNoMoreInteractions(httpClientMock); @@ -201,7 +204,6 @@ class DataRouterPublisherTest { .expectErrorMessage("Retries exhausted: 1/1") // .verify(); - verify(httpClientMock, times(2)).getBaseUri(); verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class)); verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any()); verifyNoMoreInteractions(httpClientMock); @@ -211,12 +213,9 @@ class DataRouterPublisherTest { final void prepareMocksForTests(Exception exception, Integer firstResponse, Integer... nextHttpResponses) throws Exception { httpClientMock = mock(DmaapProducerHttpClient.class); - when(appConfig.getDmaapPublisherConfiguration()).thenReturn(publisherConfigurationMock); - doReturn(publisherConfigurationMock).when(publisherTaskUnderTestSpy).resolveConfiguration(); - doReturn(httpClientMock).when(publisherTaskUnderTestSpy).resolveClient(); - - UriBuilder uriBuilder = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT); - when(httpClientMock.getBaseUri()).thenReturn(uriBuilder); + when(appConfig.getPublisherConfiguration(CHANGE_IDENTIFIER)).thenReturn(publisherConfigurationMock); + doReturn(publisherConfigurationMock).when(publisherTaskUnderTestSpy).resolveConfiguration(CHANGE_IDENTIFIER); + doReturn(httpClientMock).when(publisherTaskUnderTestSpy).resolveClient(CHANGE_IDENTIFIER); HttpResponse httpResponseMock = mock(HttpResponse.class); if (exception == null) { diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java index cad3486d..299a0238 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java @@ -83,6 +83,7 @@ public class FileCollectorTest { private static final String FTP_KEY_PASSWORD = "ftpKeyPassword"; private static final String TRUSTED_CA_PATH = "trustedCAPath"; private static final String TRUSTED_CA_PASSWORD = "trustedCAPassword"; + private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES"; private static AppConfig appConfigMock = mock(AppConfig.class); private static FtpesConfig ftpesConfigMock = mock(FtpesConfig.class); @@ -132,7 +133,8 @@ public class FileCollectorTest { .compression(GZIP_COMPRESSION) // .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) // .fileFormatVersion(FILE_FORMAT_VERSION) // - .context(new HashMap<String,String>()) + .context(new HashMap<String,String>()) // + .changeIdentifier(CHANGE_IDENTIFIER) // .build(); } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java index 83643637..44755814 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java @@ -34,10 +34,9 @@ import static org.mockito.Mockito.when; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.net.URI; -import java.nio.file.Path; -import java.nio.file.Paths; import java.util.HashMap; import java.util.Map; + import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; import org.apache.http.StatusLine; @@ -47,55 +46,47 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.service.HttpUtils; import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.springframework.web.util.DefaultUriBuilderFactory; -import org.springframework.web.util.UriBuilder; public class PublishedCheckerTest { + private static final String PUBLISH_URL = "https://54.45.33.2:1234/"; private static final String EMPTY_CONTENT = "[]"; - private static final String FEEDLOG_TOPIC = "feedlog"; - private static final String FEED_ID = "1"; - private static final String HTTPS_SCHEME = "https"; - private static final String HOST = "54.45.33.2"; - private static final int PORT = 1234; private static final String SOURCE_NAME = "oteNB5309"; private static final String FILE_NAME = "A20161224.1030-1045.bin.gz"; private static final String LOCAL_FILE_NAME = SOURCE_NAME + "_" + FILE_NAME; + private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES"; + private static final String LOG_URI = "https://localhost:3907/feedlog/1"; private static final Map<String, String> CONTEXT_MAP = new HashMap<>(); - private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class); + private static PublisherConfiguration publisherConfigurationMock = mock(PublisherConfiguration.class); private static AppConfig appConfigMock; private DmaapProducerHttpClient httpClientMock = mock(DmaapProducerHttpClient.class); private PublishedChecker publishedCheckerUnderTestSpy; - /** - * Sets up data for the tests. - */ + @BeforeAll - public static void setUp() { - when(publisherConfigurationMock.dmaapHostName()).thenReturn(HOST); - when(publisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME); - when(publisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT); + public static void setUp() throws DatafileTaskException { + when(publisherConfigurationMock.publishUrl()).thenReturn(PUBLISH_URL); appConfigMock = mock(AppConfig.class); - when(appConfigMock.getDmaapPublisherConfiguration()).thenReturn(publisherConfigurationMock); + when(appConfigMock.getPublisherConfiguration(CHANGE_IDENTIFIER)).thenReturn(publisherConfigurationMock); } @Test public void executeWhenNotPublished_returnsFalse() throws Exception { prepareMocksForTests(HttpUtils.SC_OK, EMPTY_CONTENT, null); - boolean isPublished = publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CONTEXT_MAP); + boolean isPublished = + publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CHANGE_IDENTIFIER, CONTEXT_MAP); assertFalse(isPublished); ArgumentCaptor<HttpUriRequest> requestCaptor = ArgumentCaptor.forClass(HttpUriRequest.class); - verify(httpClientMock).getBaseUri(); verify(httpClientMock).addUserCredentialsToHead(any(HttpUriRequest.class)); verify(httpClientMock).getDmaapProducerResponseWithCustomTimeout(requestCaptor.capture(), any(), any()); verifyNoMoreInteractions(httpClientMock); @@ -103,22 +94,17 @@ public class PublishedCheckerTest { HttpUriRequest getRequest = requestCaptor.getValue(); assertTrue(getRequest instanceof HttpGet); URI actualUri = getRequest.getURI(); - assertEquals(HTTPS_SCHEME, actualUri.getScheme()); - assertEquals(HOST, actualUri.getHost()); - assertEquals(PORT, actualUri.getPort()); - Path actualPath = Paths.get(actualUri.getPath()); - assertTrue(FEEDLOG_TOPIC.equals(actualPath.getName(0).toString())); - assertTrue(FEED_ID.equals(actualPath.getName(1).toString())); - String actualQuery = actualUri.getQuery(); - assertTrue(actualQuery.contains("type=pub")); - assertTrue(actualQuery.contains("filename=" + LOCAL_FILE_NAME)); + // https://localhost:3907/feedlog/1?type=pub&filename=oteNB5309_A20161224.1030-1045.bin.gz + String expUri = LOG_URI + "?type=pub&filename=" + LOCAL_FILE_NAME; + assertEquals(expUri, actualUri.toString()); } @Test public void executeWhenDataRouterReturnsNok_returnsFalse() throws Exception { prepareMocksForTests(HttpUtils.SC_BAD_REQUEST, EMPTY_CONTENT, null); - boolean isPublished = publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CONTEXT_MAP); + boolean isPublished = + publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CHANGE_IDENTIFIER, CONTEXT_MAP); assertFalse(isPublished); } @@ -127,7 +113,8 @@ public class PublishedCheckerTest { public void executeWhenPublished_returnsTrue() throws Exception { prepareMocksForTests(HttpUtils.SC_OK, "[" + LOCAL_FILE_NAME + "]", null); - boolean isPublished = publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CONTEXT_MAP); + boolean isPublished = + publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CHANGE_IDENTIFIER, CONTEXT_MAP); assertTrue(isPublished); } @@ -136,7 +123,8 @@ public class PublishedCheckerTest { public void executeWhenErrorInDataRouter_returnsFalse() throws Exception { prepareMocksForTests(HttpUtils.SC_OK, EMPTY_CONTENT, new DatafileTaskException("")); - boolean isPublished = publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CONTEXT_MAP); + boolean isPublished = + publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CHANGE_IDENTIFIER, CONTEXT_MAP); assertFalse(isPublished); } @@ -144,11 +132,9 @@ public class PublishedCheckerTest { final void prepareMocksForTests(int responseCode, String content, Exception exception) throws Exception { publishedCheckerUnderTestSpy = spy(new PublishedChecker(appConfigMock)); - doReturn(publisherConfigurationMock).when(publishedCheckerUnderTestSpy).resolveConfiguration(); - doReturn(httpClientMock).when(publishedCheckerUnderTestSpy).resolveClient(); - - UriBuilder uriBuilder = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT); - when(httpClientMock.getBaseUri()).thenReturn(uriBuilder); + doReturn(publisherConfigurationMock).when(publishedCheckerUnderTestSpy).resolveConfiguration(CHANGE_IDENTIFIER); + doReturn(LOG_URI).when(publisherConfigurationMock).logUrl(); + doReturn(httpClientMock).when(publishedCheckerUnderTestSpy).resolveClient(publisherConfigurationMock); HttpResponse httpResponseMock = mock(HttpResponse.class); if (exception == null) { diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java index 0d5a4231..a1021868 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java @@ -42,6 +42,11 @@ import java.util.Map; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration; +import org.onap.dcaegen2.collectors.datafile.configuration.ImmutableConsumerConfiguration; +import org.onap.dcaegen2.collectors.datafile.configuration.ImmutablePublisherConfiguration; +import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; @@ -51,8 +56,6 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformati import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage; import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData; import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -61,6 +64,7 @@ import reactor.test.StepVerifier; public class ScheduledTasksTest { private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; + private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES"; private AppConfig appConfig = mock(AppConfig.class); private ScheduledTasks testedObject = spy(new ScheduledTasks(appConfig)); @@ -72,23 +76,33 @@ public class ScheduledTasksTest { private DataRouterPublisher dataRouterMock; private Map<String, String> contextMap = new HashMap<String, String>(); + private final String publishUrl = "https://54.45.33.2:1234/unauthenticated.VES_NOTIFICATION_OUTPUT"; + @BeforeEach - private void setUp() { - DmaapPublisherConfiguration dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() // - .dmaapContentType("application/json") // - .dmaapHostName("54.45.33.2") // - .dmaapPortNumber(1234) // - .dmaapProtocol("https") // - .dmaapUserName("DFC") // - .dmaapUserPassword("DFC") // - .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") // + private void setUp() throws DatafileTaskException { + final PublisherConfiguration dmaapPublisherConfiguration = ImmutablePublisherConfiguration.builder() // + .publishUrl(publishUrl) // + .logUrl("") // + .userName("userName") // + .passWord("passWord") // .trustStorePath("trustStorePath") // .trustStorePasswordPath("trustStorePasswordPath") // .keyStorePath("keyStorePath") // .keyStorePasswordPath("keyStorePasswordPath") // .enableDmaapCertAuth(true) // + .changeIdentifier(CHANGE_IDENTIFIER) // .build(); // - doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration(); + final ConsumerConfiguration dmaapConsumerConfiguration = ImmutableConsumerConfiguration.builder() // + .topicUrl("topicUrl").trustStorePath("trustStorePath") // + .trustStorePasswordPath("trustStorePasswordPath") // + .keyStorePath("keyStorePath") // + .keyStorePasswordPath("keyStorePasswordPath") // + .enableDmaapCertAuth(true) // + .build(); + + doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER); + doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration(); + doReturn(true).when(appConfig).isFeedConfigured(CHANGE_IDENTIFIER); consumerMock = mock(DMaaPMessageConsumer.class); publishedCheckerMock = mock(PublishedChecker.class); @@ -109,7 +123,7 @@ public class ScheduledTasksTest { .sourceName("") // .startEpochMicrosec("") // .timeZoneOffset("") // - .changeIdentifier("") // + .changeIdentifier(CHANGE_IDENTIFIER) // .changeType("") // .build(); } @@ -164,11 +178,12 @@ public class ScheduledTasksTest { .compression("") // .fileFormatType("") // .fileFormatVersion("") // + .changeIdentifier(CHANGE_IDENTIFIER) // .context(new HashMap<String, String>()).build(); } @Test - public void notingToConsume() { + public void notingToConsume() throws DatafileTaskException { doReturn(consumerMock).when(testedObject).createConsumerTask(); doReturn(Flux.empty()).when(consumerMock).getMessageRouterResponse(); @@ -180,7 +195,7 @@ public class ScheduledTasksTest { } @Test - public void consume_successfulCase() { + public void consume_successfulCase() throws DatafileTaskException { final int noOfEvents = 200; final int noOfFilesPerEvent = 200; final int noOfFiles = noOfEvents * noOfFilesPerEvent; @@ -188,7 +203,7 @@ public class ScheduledTasksTest { Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true); doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse(); - doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any()); + doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any()); Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation()); doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull()); @@ -212,11 +227,11 @@ public class ScheduledTasksTest { } @Test - public void consume_fetchFailedOnce() { + public void consume_fetchFailedOnce() throws DatafileTaskException { Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse(); - doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any()); + doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any()); Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation()); Mono<Object> error = Mono.error(new Exception("problem")); @@ -246,12 +261,12 @@ public class ScheduledTasksTest { } @Test - public void consume_publishFailedOnce() { + public void consume_publishFailedOnce() throws DatafileTaskException { Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse(); - doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any()); + doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any()); Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation()); doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull()); @@ -279,7 +294,7 @@ public class ScheduledTasksTest { } @Test - public void consume_successfulCase_sameFileNames() { + public void consume_successfulCase_sameFileNames() throws DatafileTaskException { final int noOfEvents = 1; final int noOfFilesPerEvent = 100; @@ -287,7 +302,7 @@ public class ScheduledTasksTest { Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false); doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse(); - doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any()); + doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any()); Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation()); doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull()); @@ -303,7 +318,7 @@ public class ScheduledTasksTest { verify(consumerMock, times(1)).getMessageRouterResponse(); verify(fileCollectorMock, times(1)).collectFile(notNull(), anyLong(), notNull(), notNull()); verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull()); - verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), notNull()); + verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), anyString(), notNull()); verifyNoMoreInteractions(dataRouterMock); verifyNoMoreInteractions(fileCollectorMock); verifyNoMoreInteractions(consumerMock); diff --git a/datafile-app-server/src/test/resources/datafile_endpoints.json b/datafile-app-server/src/test/resources/datafile_endpoints.json deleted file mode 100644 index 14dee368..00000000 --- a/datafile-app-server/src/test/resources/datafile_endpoints.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - "configs": { - "dmaap": { - "dmaapConsumerConfiguration": { - "consumerId": "C12", - "dmaapHostName": "localhost", - "dmaapPortNumber": 2222, - "dmaapTopicName": "/events/unauthenticated.VES_NOTIFICATION_OUTPUT", - "dmaapProtocol": "http", - "dmaapUserName": "admin", - "dmaapUserPassword": "admin", - "dmaapContentType": "application/json", - "consumerGroup": "OpenDcae-c12", - "timeoutMs": -1, - "messageLimit": 1 - }, - "dmaapProducerConfiguration": { - "dmaapHostName": "localhost", - "dmaapPortNumber": 3907, - "dmaapProtocol": "https", - "dmaapTopicName": "publish", - "dmaapUserName": "dradmin", - "dmaapUserPassword": "dradmin", - "dmaapContentType": "application/octet-stream" - } - }, - "ftp": { - "ftpesConfiguration": { - "keyCert": "/config/dfc.jks", - "keyPassword": "secret", - "trustedCa": "/config/ftp.jks", - "trustedCaPassword": "secret" - } - }, - "security": { - "trustStorePath" : "trustStorePath", - "trustStorePasswordPath" : "trustStorePasswordPath", - "keyStorePath" : "keyStorePath", - "keyStorePasswordPath" : "keyStorePasswordPath", - "enableDmaapCertAuth" : "enableDmaapCertAuth" - } - } -} - diff --git a/datafile-app-server/src/test/resources/datafile_endpoints_test.json b/datafile-app-server/src/test/resources/datafile_endpoints_test.json new file mode 100644 index 00000000..4d4d00ab --- /dev/null +++ b/datafile-app-server/src/test/resources/datafile_endpoints_test.json @@ -0,0 +1,31 @@ +{ + "dmaap.ftpesConfig.keyCert":"/config/dfc.jks", + "dmaap.ftpesConfig.keyPassword":"secret", + "dmaap.ftpesConfig.trustedCa":"config/ftp.jks", + "dmaap.ftpesConfig.trustedCaPassword":"secret", + "dmaap.security.trustStorePath":"trustStorePath", + "dmaap.security.trustStorePasswordPath":"trustStorePasswordPath", + "dmaap.security.keyStorePath":"keyStorePath", + "dmaap.security.keyStorePasswordPath":"keyStorePasswordPath", + "dmaap.security.enableDmaapCertAuth":"true", + "dmaap.dmaapProducerConfiguration": { + "changeIdentifier":"PM_MEAS_FILES", + "feedName":"feed00" + }, + "streams_subscribes":{ + "dmaap_subscriber":{ + "dmmap_info":{ + "topic_url":"http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12" + }, + "type":"message_router" + } + }, + "feed00":{ + "username":"user", + "log_url":"https://dmaap.example.com/feedlog/972", + "publish_url":"https://message-router.onap.svc.cluster.local:3907/publish/1", + "location":"loc00", + "password":"password", + "publisher_id":"972.360gm" + } +} diff --git a/datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json b/datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json new file mode 100644 index 00000000..a7e2497d --- /dev/null +++ b/datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json @@ -0,0 +1,49 @@ +{ + "dmaap.ftpesConfig.keyCert":"/config/dfc.jks", + "dmaap.ftpesConfig.keyPassword":"secret", + "dmaap.ftpesConfig.trustedCa":"config/ftp.jks", + "dmaap.ftpesConfig.trustedCaPassword":"secret", + "dmaap.security.trustStorePath":"trustStorePath", + "dmaap.security.trustStorePasswordPath":"trustStorePasswordPath", + "dmaap.security.keyStorePath":"keyStorePath", + "dmaap.security.keyStorePasswordPath":"keyStorePasswordPath", + "dmaap.security.enableDmaapCertAuth":"true", + "dmaap.dmaapProducerConfiguration":[ + { + "changeIdentifier":"PM_MEAS_FILES", + "feedName":"feed00" + }, + { + "changeIdentifier":"XX_FILES", + "feedName":"feed01" + }, + { + "changeIdentifier":"YY_FILES", + "feedName":"feed01" + } + ], + "streams_subscribes":{ + "dmaap_subscriber":{ + "dmmap_info":{ + "topic_url":"http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12" + }, + "type":"message_router" + } + }, + "feed00":{ + "username":"user", + "log_url":"https://dmaap.example.com/feedlog/972", + "publish_url":"https://message-router.onap.svc.cluster.local:3907/publish/1", + "location":"loc00", + "password":"password", + "publisher_id":"972.360gm" + }, + "feed01":{ + "username":"user", + "log_url":"feed01::log_url", + "publish_url":"feed01::publish_url", + "location":"loc00", + "password":"", + "publisher_id":"" + } +} |