diff options
Diffstat (limited to 'datafile-app-server/src/main/java')
25 files changed, 620 insertions, 369 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java index a38eab8f..d324ca99 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java @@ -22,22 +22,38 @@ import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonSyntaxException; import com.google.gson.TypeAdapterFactory; + import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.time.Duration; +import java.util.Map; +import java.util.Properties; import java.util.ServiceLoader; + import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; + +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.CloudConfigurationProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.ComponentScan; import org.springframework.stereotype.Component; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + /** * Holds all configuration for the DFC. * @@ -46,105 +62,174 @@ import org.springframework.stereotype.Component; */ @Component +@ComponentScan("org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers") @EnableConfigurationProperties @ConfigurationProperties("app") public class AppConfig { - - private static final String CONFIG = "configs"; - private static final String DMAAP = "dmaap"; - private static final String DMAAP_PRODUCER = "dmaapProducerConfiguration"; - private static final String DMAAP_CONSUMER = "dmaapConsumerConfiguration"; - private static final String FTP = "ftp"; - private static final String FTPES_CONFIGURATION = "ftpesConfiguration"; - private static final String SECURITY = "security"; private static final Logger logger = LoggerFactory.getLogger(AppConfig.class); - private DmaapConsumerConfiguration dmaapConsumerConfiguration; - private DmaapPublisherConfiguration dmaapPublisherConfiguration; + private ConsumerConfiguration dmaapConsumerConfiguration; + private Map<String, PublisherConfiguration> publishingConfiguration; private FtpesConfig ftpesConfiguration; + private CloudConfigurationProvider cloudConfigurationProvider; + @Value("#{systemEnvironment}") + Properties systemEnvironment; + private Disposable refreshConfigTask = null; @NotEmpty private String filepath; - public synchronized DmaapConsumerConfiguration getDmaapConsumerConfiguration() { + @Autowired + public synchronized void setCloudConfigurationProvider( + CloudConfigurationProvider reactiveCloudConfigurationProvider) { + this.cloudConfigurationProvider = reactiveCloudConfigurationProvider; + } + + public synchronized void setFilepath(String filepath) { + this.filepath = filepath; + } + + /** + * Reads the cloud configuration. + */ + public void initialize() { + stop(); + Map<String, String> context = MappedDiagnosticContext.initializeTraceContext(); + loadConfigurationFromFile(); + + refreshConfigTask = Flux.interval(Duration.ZERO, Duration.ofMinutes(5)) + .flatMap(count -> createRefreshConfigurationTask(count, context)) + .subscribe(e -> logger.info("Refreshed configuration data"), + throwable -> logger.error("Configuration refresh terminated due to exception", throwable), + () -> logger.error("Configuration refresh terminated")); + } + + public void stop() { + if (refreshConfigTask != null) { + refreshConfigTask.dispose(); + refreshConfigTask = null; + } + } + + public synchronized ConsumerConfiguration getDmaapConsumerConfiguration() { return dmaapConsumerConfiguration; } - public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() { - return dmaapPublisherConfiguration; + public synchronized boolean isFeedConfigured(String changeIdentifier) { + return publishingConfiguration.containsKey(changeIdentifier); + } + + public synchronized PublisherConfiguration getPublisherConfiguration(String changeIdentifier) + throws DatafileTaskException { + + if (publishingConfiguration == null) { + throw new DatafileTaskException("No PublishingConfiguration loaded, changeIdentifier: " + changeIdentifier); + } + PublisherConfiguration cfg = publishingConfiguration.get(changeIdentifier); + if (cfg == null) { + throw new DatafileTaskException( + "Cannot find getPublishingConfiguration for changeIdentifier: " + changeIdentifier); + } + return cfg; } public synchronized FtpesConfig getFtpesConfiguration() { return ftpesConfiguration; } + Flux<AppConfig> createRefreshConfigurationTask(Long counter, Map<String, String> context) { + return Flux.just(counter) // + .doOnNext(cnt -> logger.debug("Refresh config {}", cnt)) // + .flatMap(cnt -> readEnvironmentVariables(systemEnvironment, context)) // + .flatMap(this::fetchConfiguration); + } + + Mono<EnvProperties> readEnvironmentVariables(Properties systemEnvironment, Map<String, String> context) { + return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context) + .onErrorResume(AppConfig::onErrorResume); + } + + private static <R> Mono<R> onErrorResume(Throwable trowable) { + logger.error("Could not refresh application configuration {}", trowable.toString()); + return Mono.empty(); + } + + private Mono<AppConfig> fetchConfiguration(EnvProperties env) { + Mono<JsonObject> serviceCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(env) // + .onErrorResume(AppConfig::onErrorResume); + + // Note, have to use this callForServiceConfigurationReactive with EnvProperties, since the + // other ones does not work + EnvProperties dmaapEnv = ImmutableEnvProperties.builder() // + .consulHost(env.consulHost()) // + .consulPort(env.consulPort()) // + .cbsName(env.cbsName()) // + .appName(env.appName() + ":dmaap") // + .build(); // + Mono<JsonObject> dmaapCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(dmaapEnv) + .onErrorResume(t -> Mono.just(new JsonObject())); + + return serviceCfg.zipWith(dmaapCfg, this::parseCloudConfig) // + .onErrorResume(AppConfig::onErrorResume); + } + + /** + * parse configuration + * + * @param serviceConfigRootObject + * @param dmaapConfigRootObject if there is no dmaapConfigRootObject, the dmaap feeds are taken + * from the serviceConfigRootObject + * @return this which is updated if successful + */ + private AppConfig parseCloudConfig(JsonObject serviceConfigRootObject, JsonObject dmaapConfigRootObject) { + try { + CloudConfigParser parser = new CloudConfigParser(serviceConfigRootObject, dmaapConfigRootObject); + setConfiguration(parser.getDmaapConsumerConfig(), parser.getDmaapPublisherConfig(), + parser.getFtpesConfig()); + } catch (DatafileTaskException e) { + logger.error("Could not parse configuration {}", e.toString(), e); + } + return this; + } + /** * Reads the configuration from file. */ - public void loadConfigurationFromFile() { + void loadConfigurationFromFile() { GsonBuilder gsonBuilder = new GsonBuilder(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); - JsonParser parser = new JsonParser(); - JsonObject jsonObject; + try (InputStream inputStream = createInputStream(filepath)) { - JsonElement rootElement = getJsonElement(parser, inputStream); - if (rootElement.isJsonObject()) { - jsonObject = rootElement.getAsJsonObject(); - FtpesConfig ftpesConfig = deserializeType(gsonBuilder, - jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(FTP).getAsJsonObject(FTPES_CONFIGURATION), - FtpesConfig.class); - DmaapConsumerConfiguration consumerConfiguration = deserializeType(gsonBuilder, - concatenateJsonObjects( - jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP) - .getAsJsonObject(DMAAP_CONSUMER), - rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)), - DmaapConsumerConfiguration.class); - - DmaapPublisherConfiguration publisherConfiguration = deserializeType(gsonBuilder, - concatenateJsonObjects( - jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP) - .getAsJsonObject(DMAAP_PRODUCER), - rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)), - DmaapPublisherConfiguration.class); - - setConfiguration(consumerConfiguration, publisherConfiguration, ftpesConfig); + JsonParser parser = new JsonParser(); + JsonObject rootObject = getJsonElement(parser, inputStream).getAsJsonObject(); + if (rootObject == null) { + throw new JsonSyntaxException("Root is not a json object"); } + parseCloudConfig(rootObject, rootObject); } catch (JsonSyntaxException | IOException e) { - logger.error("Problem with loading configuration, file: {}", filepath, e); + logger.warn("Local configuration file not loaded: {}", filepath, e); } } - synchronized void setConfiguration(DmaapConsumerConfiguration consumerConfiguration, - DmaapPublisherConfiguration publisherConfiguration, FtpesConfig ftpesConfig) { - this.dmaapConsumerConfiguration = consumerConfiguration; - this.dmaapPublisherConfiguration = publisherConfiguration; - this.ftpesConfiguration = ftpesConfig; + private synchronized void setConfiguration(ConsumerConfiguration consumerConfiguration, + Map<String, PublisherConfiguration> publisherConfiguration, FtpesConfig ftpesConfig) { + if (consumerConfiguration == null || publisherConfiguration == null || ftpesConfig == null) { + logger.error( + "Problem with configuration consumerConfiguration: {}, publisherConfiguration: {}, ftpesConfig: {}", + consumerConfiguration, publisherConfiguration, ftpesConfig); + } else { + this.dmaapConsumerConfiguration = consumerConfiguration; + this.publishingConfiguration = publisherConfiguration; + this.ftpesConfiguration = ftpesConfig; + } } JsonElement getJsonElement(JsonParser parser, InputStream inputStream) { return parser.parse(new InputStreamReader(inputStream)); } - private <T> T deserializeType(@NotNull GsonBuilder gsonBuilder, @NotNull JsonObject jsonObject, - @NotNull Class<T> type) { - return gsonBuilder.create().fromJson(jsonObject, type); - } - InputStream createInputStream(@NotNull String filepath) throws IOException { return new BufferedInputStream(new FileInputStream(filepath)); } - synchronized String getFilepath() { - return this.filepath; - } - - public synchronized void setFilepath(String filepath) { - this.filepath = filepath; - } - - private JsonObject concatenateJsonObjects(JsonObject target, JsonObject source) { - source.entrySet().forEach(entry -> target.add(entry.getKey(), entry.getValue())); - return target; - } - } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java index 6b7860c4..3ac6b2c6 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java @@ -18,11 +18,17 @@ package org.onap.dcaegen2.collectors.datafile.configuration; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; import com.google.gson.JsonObject; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; /** @@ -32,63 +38,106 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.Immutabl * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ public class CloudConfigParser { - private static final String DMAAP_SECURITY_TRUST_STORE_PATH = "dmaap.security.trustStorePath"; private static final String DMAAP_SECURITY_TRUST_STORE_PASS_PATH = "dmaap.security.trustStorePasswordPath"; private static final String DMAAP_SECURITY_KEY_STORE_PATH = "dmaap.security.keyStorePath"; private static final String DMAAP_SECURITY_KEY_STORE_PASS_PATH = "dmaap.security.keyStorePasswordPath"; private static final String DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH = "dmaap.security.enableDmaapCertAuth"; - private final JsonObject jsonObject; + private final JsonObject serviceConfigurationRoot; + private final JsonObject dmaapConfigurationRoot; - CloudConfigParser(JsonObject jsonObject) { - this.jsonObject = jsonObject; + public CloudConfigParser(JsonObject serviceConfigurationRoot, JsonObject dmaapConfigurationRoot) { + this.serviceConfigurationRoot = serviceConfigurationRoot; + this.dmaapConfigurationRoot = dmaapConfigurationRoot; } - DmaapPublisherConfiguration getDmaapPublisherConfig() { - return new ImmutableDmaapPublisherConfiguration.Builder() - .dmaapTopicName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapTopicName").getAsString()) - .dmaapUserPassword(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserPassword").getAsString()) - .dmaapPortNumber(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapPortNumber").getAsInt()) - .dmaapProtocol(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapProtocol").getAsString()) - .dmaapContentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString()) - .dmaapHostName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapHostName").getAsString()) - .dmaapUserName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserName").getAsString()) - .trustStorePath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PATH).getAsString()) - .trustStorePasswordPath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PASS_PATH).getAsString()) - .keyStorePath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PATH).getAsString()) - .keyStorePasswordPath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PASS_PATH).getAsString()) - .enableDmaapCertAuth(jsonObject.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) // - .build(); + public Map<String, PublisherConfiguration> getDmaapPublisherConfig() throws DatafileTaskException { + Iterator<JsonElement> producerCfgs = + toArray(serviceConfigurationRoot.get("dmaap.dmaapProducerConfiguration")).iterator(); + + Map<String, PublisherConfiguration> result = new HashMap<>(); + + while (producerCfgs.hasNext()) { + JsonObject producerCfg = producerCfgs.next().getAsJsonObject(); + String feedName = getAsString(producerCfg, "feedName"); + JsonObject feedConfig = getFeedConfig(feedName); + + PublisherConfiguration cfg = ImmutablePublisherConfiguration.builder() // + .publishUrl(getAsString(feedConfig, "publish_url")) // + .passWord(getAsString(feedConfig, "password")) // + .userName(getAsString(feedConfig, "username")) // + .trustStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PATH)) // + .trustStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) // + .keyStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PATH)) // + .keyStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) // + .enableDmaapCertAuth( + get(serviceConfigurationRoot, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) // + .changeIdentifier(getAsString(producerCfg, "changeIdentifier")) // + .logUrl(getAsString(feedConfig, "log_url")) // + .build(); + + result.put(cfg.changeIdentifier(), cfg); + } + return result; } - DmaapConsumerConfiguration getDmaapConsumerConfig() { - return new ImmutableDmaapConsumerConfiguration.Builder() - .timeoutMs(jsonObject.get("dmaap.dmaapConsumerConfiguration.timeoutMs").getAsInt()) - .dmaapHostName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapHostName").getAsString()) - .dmaapUserName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserName").getAsString()) - .dmaapUserPassword(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserPassword").getAsString()) - .dmaapTopicName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapTopicName").getAsString()) - .dmaapPortNumber(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapPortNumber").getAsInt()) - .dmaapContentType(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapContentType").getAsString()) - .messageLimit(jsonObject.get("dmaap.dmaapConsumerConfiguration.messageLimit").getAsInt()) - .dmaapProtocol(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapProtocol").getAsString()) - .consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString()) - .consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString()) - .trustStorePath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PATH).getAsString()) - .trustStorePasswordPath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PASS_PATH).getAsString()) - .keyStorePath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PATH).getAsString()) - .keyStorePasswordPath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PASS_PATH).getAsString()) - .enableDmaapCertAuth(jsonObject.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) // + public ConsumerConfiguration getDmaapConsumerConfig() throws DatafileTaskException { + JsonObject consumerCfg = serviceConfigurationRoot.get("streams_subscribes").getAsJsonObject(); + Set<Entry<String, JsonElement>> topics = consumerCfg.entrySet(); + if (topics.size() != 1) { + throw new DatafileTaskException("Invalid configuration, number oftopic must be one, config: " + topics); + } + JsonObject topic = topics.iterator().next().getValue().getAsJsonObject(); + JsonObject dmaapInfo = get(topic, "dmmap_info").getAsJsonObject(); + String topicUrl = getAsString(dmaapInfo, "topic_url"); + + return ImmutableConsumerConfiguration.builder().topicUrl(topicUrl) + .trustStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PATH)) + .trustStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) + .keyStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PATH)) + .keyStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) + .enableDmaapCertAuth( + get(serviceConfigurationRoot, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) // .build(); } - FtpesConfig getFtpesConfig() { + public FtpesConfig getFtpesConfig() throws DatafileTaskException { return new ImmutableFtpesConfig.Builder() // - .keyCert(jsonObject.get("dmaap.ftpesConfig.keyCert").getAsString()) - .keyPassword(jsonObject.get("dmaap.ftpesConfig.keyPassword").getAsString()) - .trustedCa(jsonObject.get("dmaap.ftpesConfig.trustedCa").getAsString()) - .trustedCaPassword(jsonObject.get("dmaap.ftpesConfig.trustedCaPassword").getAsString()) // + .keyCert(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyCert")) + .keyPassword(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyPassword")) + .trustedCa(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.trustedCa")) + .trustedCaPassword(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.trustedCaPassword")) // .build(); } + + private static JsonElement get(JsonObject obj, String memberName) throws DatafileTaskException { + JsonElement elem = obj.get(memberName); + if (elem == null) { + throw new DatafileTaskException("Could not find member: " + memberName + " in: " + obj); + } + return elem; + } + + private static String getAsString(JsonObject obj, String memberName) throws DatafileTaskException { + return get(obj, memberName).getAsString(); + } + + private JsonObject getFeedConfig(String feedName) throws DatafileTaskException { + JsonElement elem = dmaapConfigurationRoot.get(feedName); + if (elem == null) { + elem = get(serviceConfigurationRoot, feedName); // Fallback, try to find it under + // serviceConfigurationRoot + } + return elem.getAsJsonObject(); + } + + private static JsonArray toArray(JsonElement obj) { + if (obj.isJsonArray()) { + return obj.getAsJsonArray(); + } + JsonArray arr = new JsonArray(); + arr.add(obj); + return arr; + } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java deleted file mode 100644 index 597f525f..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java +++ /dev/null @@ -1,110 +0,0 @@ -/*- - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.configuration; - -import com.google.gson.JsonObject; -import java.util.Map; -import java.util.Properties; -import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.ReactiveCloudConfigurationProvider; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.ComponentScan; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Primary; -import org.springframework.scheduling.annotation.EnableScheduling; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; - -/** - * Gets the DFC configuration from the ConfigBindingService/Consul and parses it to the configurations needed in DFC. - * - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18 - * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> - */ -@Configuration -@ComponentScan("org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers") -@EnableConfigurationProperties -@EnableScheduling -@Primary -public class CloudConfiguration extends AppConfig { - private static final Logger logger = LoggerFactory.getLogger(CloudConfiguration.class); - private ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider; - private DmaapPublisherConfiguration dmaapPublisherCloudConfiguration; - private DmaapConsumerConfiguration dmaapConsumerCloudConfiguration; - private FtpesConfig ftpesCloudConfiguration; - - @Value("#{systemEnvironment}") - private Properties systemEnvironment; - - @Autowired - public synchronized void setThreadPoolTaskScheduler( - ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider) { - this.reactiveCloudConfigurationProvider = reactiveCloudConfigurationProvider; - } - - /** - * Reads the cloud configuration. - */ - public void runTask() { - Map<String,String> context = MappedDiagnosticContext.initializeTraceContext(); - EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context) // - .subscribeOn(Schedulers.parallel()) // - .flatMap(reactiveCloudConfigurationProvider::callForServiceConfigurationReactive) // - .flatMap(this::parseCloudConfig) // - .subscribe(null, this::onError, this::onComplete); - } - - private void onComplete() { - logger.trace("Configuration updated"); - } - - private void onError(Throwable throwable) { - logger.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable); - } - - private synchronized Mono<CloudConfiguration> parseCloudConfig(JsonObject jsonObject) { - logger.info("Received application configuration: {}", jsonObject); - CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject); - dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig(); - dmaapConsumerCloudConfiguration = cloudConfigParser.getDmaapConsumerConfig(); - ftpesCloudConfiguration = cloudConfigParser.getFtpesConfig(); - return Mono.just(this); - } - - @Override - public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() { - return dmaapPublisherCloudConfiguration != null ? dmaapPublisherCloudConfiguration - : super.getDmaapPublisherConfiguration(); - } - - @Override - public synchronized DmaapConsumerConfiguration getDmaapConsumerConfiguration() { - return dmaapConsumerCloudConfiguration != null ? dmaapConsumerCloudConfiguration - : super.getDmaapConsumerConfiguration(); - } - - @Override - public synchronized FtpesConfig getFtpesConfiguration() { - return ftpesCloudConfiguration != null ? ftpesCloudConfiguration : super.getFtpesConfiguration(); - } -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java new file mode 100644 index 00000000..fc9ab204 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java @@ -0,0 +1,105 @@ +/*- + * ============LICENSE_START====================================================================== + * Copyright (C) 2018,2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.configuration; + +import java.net.MalformedURLException; +import java.net.URL; + +import org.immutables.gson.Gson; +import org.immutables.value.Value; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration; + +@Value.Immutable +@Value.Style(redactedMask = "####") +@Gson.TypeAdapters +public abstract class ConsumerConfiguration { + @Value.Redacted + public abstract String topicUrl(); + + public abstract String trustStorePath(); + + public abstract String trustStorePasswordPath(); + + public abstract String keyStorePath(); + + public abstract String keyStorePasswordPath(); + + public abstract Boolean enableDmaapCertAuth(); + + public DmaapConsumerConfiguration toDmaap() throws DatafileTaskException { + try { + URL url = new URL(topicUrl()); + String passwd = ""; + String userName = ""; + if (url.getUserInfo() != null) { + String[] userInfo = url.getUserInfo().split(":"); + userName = userInfo[0]; + passwd = userInfo[1]; + } + String urlPath = url.getPath(); + DmaapConsumerUrlPath path = parseDmaapUrlPath(urlPath); + + return new ImmutableDmaapConsumerConfiguration.Builder() // + .dmaapContentType("application/json") // + .dmaapPortNumber(url.getPort()) // + .dmaapHostName(url.getHost()) // + .dmaapTopicName(path.dmaapTopicName) // + .dmaapProtocol(url.getProtocol()) // + .dmaapUserName(userName) // + .dmaapUserPassword(passwd) // + .trustStorePath(this.trustStorePath()) // + .trustStorePasswordPath(this.trustStorePasswordPath()) // + .keyStorePath(this.keyStorePath()) // + .keyStorePasswordPath(this.keyStorePasswordPath()) // + .enableDmaapCertAuth(this.enableDmaapCertAuth()) // + .consumerId(path.consumerId) // + .consumerGroup(path.consumerGroup) // + .timeoutMs(-1) // + .messageLimit(-1) // + .build(); + } catch (MalformedURLException e) { + throw new DatafileTaskException("Could not parse the URL", e); + } + } + + private class DmaapConsumerUrlPath { + final String dmaapTopicName; + final String consumerGroup; + final String consumerId; + + DmaapConsumerUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) { + this.dmaapTopicName = dmaapTopicName; + this.consumerGroup = consumerGroup; + this.consumerId = consumerId; + } + } + + private DmaapConsumerUrlPath parseDmaapUrlPath(String urlPath) throws DatafileTaskException { + String[] tokens = urlPath.split("/"); // UrlPath: /events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12 + if (tokens.length != 5) { + throw new DatafileTaskException("The path has incorrect syntax: " + urlPath); + } + + final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // ex. // /events/unauthenticated.VES_NOTIFICATION_OUTPUT + final String consumerGroup = tokens[3]; // ex. OpenDcae-c12 + final String consumerId = tokens[4]; // ex. C12 + return new DmaapConsumerUrlPath(dmaapTopicName, consumerGroup, consumerId); + } + +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java index 71003f80..62af92a8 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java @@ -19,12 +19,14 @@ package org.onap.dcaegen2.collectors.datafile.configuration; import java.util.Map; import java.util.Optional; import java.util.Properties; + import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; + import reactor.core.publisher.Mono; /** @@ -53,7 +55,7 @@ class EnvironmentProcessor { } catch (EnvironmentLoaderException e) { return Mono.error(e); } - logger.info("Evaluated environment system variables {}", envProperties); + logger.trace("Evaluated environment system variables {}", envProperties); return Mono.just(envProperties); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java index 3f029359..844699eb 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java @@ -21,6 +21,7 @@ package org.onap.dcaegen2.collectors.datafile.configuration; import java.io.Serializable; + import org.immutables.gson.Gson; import org.immutables.value.Value; import org.springframework.stereotype.Component; @@ -28,7 +29,7 @@ import org.springframework.stereotype.Component; @Component @Value.Immutable -@Value.Style(builder = "new") +@Value.Style(builder = "new", redactedMask = "####") @Gson.TypeAdapters public abstract class FtpesConfig implements Serializable { @@ -38,11 +39,13 @@ public abstract class FtpesConfig implements Serializable { public abstract String keyCert(); @Value.Parameter + @Value.Redacted public abstract String keyPassword(); @Value.Parameter public abstract String trustedCa(); @Value.Parameter + @Value.Redacted public abstract String trustedCaPassword(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java new file mode 100644 index 00000000..5576ed26 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java @@ -0,0 +1,74 @@ +/*- + * ============LICENSE_START====================================================================== + * Copyright (C) 2018,2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.configuration; + +import java.net.MalformedURLException; +import java.net.URL; + +import org.immutables.gson.Gson; +import org.immutables.value.Value; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; + + +@Value.Immutable +@Value.Style(redactedMask = "####") +@Gson.TypeAdapters +public interface PublisherConfiguration { + + String publishUrl(); + + String logUrl(); + + String userName(); + + @Value.Redacted + String passWord(); + + String trustStorePath(); + + String trustStorePasswordPath(); + + String keyStorePath(); + + String keyStorePasswordPath(); + + Boolean enableDmaapCertAuth(); + + String changeIdentifier(); + + default DmaapPublisherConfiguration toDmaap() throws MalformedURLException { + URL url = new URL(publishUrl()); + String urlPath = url.getPath(); + + return new ImmutableDmaapPublisherConfiguration.Builder() // + .dmaapContentType("application/octet-stream") // + .dmaapPortNumber(url.getPort()) // + .dmaapHostName(url.getHost()) // + .dmaapTopicName(urlPath) // + .dmaapProtocol(url.getProtocol()) // + .dmaapUserName(this.userName()) // + .dmaapUserPassword(this.passWord()) // + .trustStorePath(this.trustStorePath()) // + .trustStorePasswordPath(this.trustStorePasswordPath()) // + .keyStorePath(this.keyStorePath()) // + .keyStorePasswordPath(this.keyStorePasswordPath()) // + .enableDmaapCertAuth(this.enableDmaapCertAuth()) // + .build(); + } + +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java index b78e4ae5..5835de1a 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java @@ -16,7 +16,6 @@ package org.onap.dcaegen2.collectors.datafile.configuration; -import io.swagger.annotations.ApiOperation; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; @@ -24,7 +23,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledFuture; + import javax.annotation.PostConstruct; + import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks; import org.slf4j.Logger; @@ -36,6 +37,8 @@ import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableScheduling; + +import io.swagger.annotations.ApiOperation; import reactor.core.publisher.Mono; /** @@ -49,7 +52,6 @@ import reactor.core.publisher.Mono; public class SchedulerConfig { private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = Duration.ofSeconds(15); - private static final Duration SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = Duration.ofMinutes(5); private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE = Duration.ofHours(1); private static final Logger logger = LoggerFactory.getLogger(SchedulerConfig.class); private static List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>(); @@ -57,21 +59,21 @@ public class SchedulerConfig { private final TaskScheduler taskScheduler; private final ScheduledTasks scheduledTask; - private final CloudConfiguration cloudConfiguration; + private final AppConfig configuration; /** * Constructor. * * @param taskScheduler The scheduler used to schedule the tasks. * @param scheduledTasks The scheduler that will actually handle the tasks. - * @param cloudConfiguration The DFC configuration. + * @param configuration The DFC configuration. */ @Autowired public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTasks, - CloudConfiguration cloudConfiguration) { + AppConfig configuration) { this.taskScheduler = taskScheduler; this.scheduledTask = scheduledTasks; - this.cloudConfiguration = cloudConfiguration; + this.configuration = configuration; } /** @@ -83,6 +85,7 @@ public class SchedulerConfig { public synchronized Mono<ResponseEntity<String>> getResponseFromCancellationOfTasks() { scheduledFutureList.forEach(x -> x.cancel(false)); scheduledFutureList.clear(); + configuration.stop(); MDC.setContextMap(contextMap); logger.info("Stopped Datafile workflow"); MDC.clear(); @@ -99,12 +102,11 @@ public class SchedulerConfig { public synchronized boolean tryToStartTask() { contextMap = MappedDiagnosticContext.initializeTraceContext(); logger.info("Start scheduling Datafile workflow"); + configuration.initialize(); + if (scheduledFutureList.isEmpty()) { - scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(cloudConfiguration::runTask, - Instant.now(), SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY)); - scheduledFutureList.add( - taskScheduler.scheduleWithFixedDelay(scheduledTask::executeDatafileMainTask, - SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS)); + scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(scheduledTask::executeDatafileMainTask, + SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS)); scheduledFutureList .add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()), SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE)); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java index 71242265..5a78261a 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java @@ -62,7 +62,7 @@ public class SwaggerConfig extends WebMvcConfigurationSupport { .build(); } - private ApiInfo apiInfo() { + private static ApiInfo apiInfo() { return new ApiInfoBuilder() // .title(API_TITLE) // .description(DESCRIPTION) // diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java index cbd67297..847ca46e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java @@ -41,7 +41,7 @@ public class TomcatHttpConfig { return tomcat; } - private Connector getHttpConnector() { + private static Connector getHttpConnector() { Connector connector = new Connector(TomcatServletWebServerFactory.DEFAULT_PROTOCOL); connector.setScheme("http"); connector.setPort(8100); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java index 4716fa87..791f0cf1 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java @@ -71,7 +71,7 @@ public class ScheduleController { public Mono<ResponseEntity<String>> startTasks() { return Mono.fromSupplier(schedulerConfig::tryToStartTask) // - .map(this::createStartTaskResponse); + .map(ScheduleController::createStartTaskResponse); } /** @@ -90,7 +90,7 @@ public class ScheduleController { } @ApiOperation(value = "Sends success or error response on starting task execution") - private ResponseEntity<String> createStartTaskResponse(boolean wasScheduled) { + private static ResponseEntity<String> createStartTaskResponse(boolean wasScheduled) { if (wasScheduled) { return new ResponseEntity<>("Datafile Service has been started!", HttpStatus.CREATED); } else { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java index 4c49dd8a..72623db2 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java @@ -17,6 +17,7 @@ package org.onap.dcaegen2.collectors.datafile.ftp; import java.util.Optional; + import org.immutables.value.Value; /** @@ -26,11 +27,13 @@ import org.immutables.value.Value; * */ @Value.Immutable +@Value.Style(redactedMask = "####") public interface FileServerData { public String serverAddress(); public String userId(); + @Value.Redacted public String password(); public Optional<Integer> port(); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java index c78ae3a3..bb3016ce 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java @@ -128,7 +128,7 @@ public class FtpsClient implements FileCollectClient { logger.trace("collectFile fetched: {}", localFileName); } - private int getPort(Optional<Integer> port) { + private static int getPort(Optional<Integer> port) { return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT; } @@ -180,7 +180,7 @@ public class FtpsClient implements FileCollectClient { logger.warn("Local file {} already created", localFileName); } OutputStream output = new FileOutputStream(localFile); - logger.debug("File {} opened xNF", localFileName); + logger.trace("File {} opened xNF", localFileName); return output; } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java index 333be92a..bdaf6d4c 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java @@ -55,7 +55,7 @@ public class SftpClient implements FileCollectClient { try { sftpChannel.get(remoteFile, localFile.toString()); - logger.debug("File {} Download Successfull from xNF", localFile.getFileName()); + logger.trace("File {} Download Successfull from xNF", localFile.getFileName()); } catch (SftpException e) { boolean retry = e.id != ChannelSftp.SSH_FX_NO_SUCH_FILE && e.id != ChannelSftp.SSH_FX_PERMISSION_DENIED && e.id != ChannelSftp.SSH_FX_OP_UNSUPPORTED; throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e, retry); @@ -90,11 +90,11 @@ public class SftpClient implements FileCollectClient { } } - private int getPort(Optional<Integer> port) { + private static int getPort(Optional<Integer> port) { return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT; } - private Session setUpSession(FileServerData fileServerData) throws JSchException { + private static Session setUpSession(FileServerData fileServerData) throws JSchException { JSch jsch = new JSch(); Session newSession = @@ -105,7 +105,7 @@ public class SftpClient implements FileCollectClient { return newSession; } - private ChannelSftp getChannel(Session session) throws JSchException { + private static ChannelSftp getChannel(Session session) throws JSchException { Channel channel = session.openChannel("sftp"); channel.connect(); return (ChannelSftp) channel; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java index 0a6b669c..36aae949 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java @@ -38,6 +38,7 @@ import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; */ @Value.Immutable @Gson.TypeAdapters +@Value.Style(redactedMask = "####") public abstract class FileData { public static final String DATAFILE_TMPDIR = "/tmp/onap_datafile/"; @@ -53,6 +54,7 @@ public abstract class FileData { * * @return the URL to use to fetch the file from the PNF */ + @Value.Redacted public abstract String location(); /** @@ -123,7 +125,7 @@ public abstract class FileData { * @return An <code>Optional</code> containing a String array with the user name and password if given, or an empty * <code>Optional</code> if not given. */ - private Optional<String[]> getUserNameAndPasswordIfGiven(String userInfoString) { + private static Optional<String[]> getUserNameAndPasswordIfGiven(String userInfoString) { if (userInfoString != null) { String[] userAndPassword = userInfoString.split(":"); if (userAndPassword.length == 2) { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java index 63ed0daa..5b8c015e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java @@ -33,6 +33,7 @@ import org.immutables.value.Value; @Value.Immutable @Gson.TypeAdapters +@Value.Style(redactedMask = "####") public interface FilePublishInformation { @SerializedName("productName") @@ -54,6 +55,7 @@ public interface FilePublishInformation { String getTimeZoneOffset(); @SerializedName("location") + @Value.Redacted String getLocation(); @SerializedName("compression") @@ -70,4 +72,6 @@ public interface FilePublishInformation { String getName(); Map<String, String> getContext(); + + String getChangeIdentifier(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java index 7081d1ac..b8df125b 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java @@ -31,12 +31,12 @@ import java.util.Set; */ public abstract class JsonSerializer { + private JsonSerializer() {} - private static Gson gson = - new GsonBuilder() // - .serializeNulls() // - .addSerializationExclusionStrategy(new FilePublishInformationExclusionStrategy()) // - .create(); // + private static Gson gson = new GsonBuilder() // + .serializeNulls() // + .addSerializationExclusionStrategy(new FilePublishInformationExclusionStrategy()) // + .create(); // /** * Serializes a <code>filePublishInformation</code>. @@ -56,9 +56,10 @@ public abstract class JsonSerializer { private final Set<String> inclusions = Sets.newHashSet("productName", "vendorName", "lastEpochMicrosec", "sourceName", "startEpochMicrosec", "timeZoneOffset", "location", "compression", "fileFormatType", "fileFormatVersion"); + @Override - public boolean shouldSkipField(FieldAttributes f) { - return !inclusions.contains(f.getName()); + public boolean shouldSkipField(FieldAttributes fieldAttributes) { + return !inclusions.contains(fieldAttributes.getName()); } @Override diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java index 3a3eb3aa..470c4e73 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java @@ -22,11 +22,13 @@ import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; + import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.stream.StreamSupport; + import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; @@ -37,6 +39,7 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.StringUtils; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -70,7 +73,6 @@ public class JsonMessageParser { private static final String FILE_FORMAT_VERSION = "fileFormatVersion"; private static final String FILE_READY_CHANGE_TYPE = "FileReady"; - private static final String FILE_READY_CHANGE_IDENTIFIER = "PM_MEAS_FILES"; /** * The data types available in the event name. @@ -92,7 +94,7 @@ public class JsonMessageParser { * @return a <code>Flux</code> containing messages. */ public Flux<FileReadyMessage> getMessagesFromJson(Mono<String> rawMessage) { - return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createMessageData); + return rawMessage.flatMapMany(JsonMessageParser::getJsonParserMessage).flatMap(this::createMessageData); } Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { @@ -123,18 +125,17 @@ public class JsonMessageParser { : getMessagesFromJsonArray(jsonElement); } - private Mono<JsonElement> getJsonParserMessage(String message) { - logger.trace("original message from message router: {}", message); + private static Mono<JsonElement> getJsonParserMessage(String message) { return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message)); } - private Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) { + private static Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) { return jsonObject.flatMap(monoJsonP -> containsNotificationFields(monoJsonP) ? transformMessages(monoJsonP) : logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject)); } - private Mono<FileReadyMessage> transformMessages(JsonObject message) { + private static Mono<FileReadyMessage> transformMessages(JsonObject message) { Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message); if (optionalMessageMetaData.isPresent()) { MessageMetaData messageMetaData = optionalMessageMetaData.get(); @@ -159,7 +160,7 @@ public class JsonMessageParser { } - private Optional<MessageMetaData> getMessageMetaData(JsonObject message) { + private static Optional<MessageMetaData> getMessageMetaData(JsonObject message) { List<String> missingValues = new ArrayList<>(); JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER); String eventName = getValueFromJson(commonEventHeader, EVENT_NAME, missingValues); @@ -182,15 +183,15 @@ public class JsonMessageParser { .changeIdentifier(changeIdentifier) // .changeType(changeType) // .build(); - if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) { + if (missingValues.isEmpty() && isChangeTypeCorrect(changeType)) { return Optional.of(messageMetaData); } else { String errorMessage = "VES event parsing."; if (!missingValues.isEmpty()) { errorMessage += " Missing data: " + missingValues; } - if (!isChangeIdentifierCorrect(changeIdentifier) || !isChangeTypeCorrect(changeType)) { - errorMessage += " Change identifier or change type is wrong."; + if (!isChangeTypeCorrect(changeType)) { + errorMessage += " Change type is wrong: " + changeType + " expected: " + FILE_READY_CHANGE_TYPE; } errorMessage += " Message: {}"; logger.error(errorMessage, message); @@ -198,15 +199,12 @@ public class JsonMessageParser { } } - private boolean isChangeTypeCorrect(String changeType) { + private static boolean isChangeTypeCorrect(String changeType) { return FILE_READY_CHANGE_TYPE.equals(changeType); } - private boolean isChangeIdentifierCorrect(String changeIdentifier) { - return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier); - } - - private List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields, MessageMetaData messageMetaData) { + private static List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields, + MessageMetaData messageMetaData) { List<FileData> res = new ArrayList<>(); for (int i = 0; i < arrayOfAdditionalFields.size(); i++) { JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i); @@ -219,7 +217,7 @@ public class JsonMessageParser { return res; } - private Optional<FileData> getFileDataFromJson(JsonObject fileInfo, MessageMetaData messageMetaData) { + private static Optional<FileData> getFileDataFromJson(JsonObject fileInfo, MessageMetaData messageMetaData) { logger.trace("starting to getFileDataFromJson!"); List<String> missingValues = new ArrayList<>(); @@ -250,15 +248,17 @@ public class JsonMessageParser { } /** - * Gets data from the event name. Defined as: {DomainAbbreviation}_{productName}-{vendorName}_{Description}, - * example: Noti_RnNode-Ericsson_FileReady + * Gets data from the event name. Defined as: + * {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example: + * Noti_RnNode-Ericsson_FileReady * * @param dataType The type of data to get, {@link DmaapConsumerJsonParser.EventNameDataType}. * @param eventName The event name to get the data from. * @param missingValues List of missing values. The dataType will be added if missing. * @return String of data from event name */ - private String getDataFromEventName(EventNameDataType dataType, String eventName, List<String> missingValues) { + private static String getDataFromEventName(EventNameDataType dataType, String eventName, + List<String> missingValues) { String[] eventArray = eventName.split("_|-"); if (eventArray.length >= 4) { return eventArray[dataType.index]; @@ -269,7 +269,7 @@ public class JsonMessageParser { return ""; } - private String getValueFromJson(JsonObject jsonObject, String jsonKey, List<String> missingValues) { + private static String getValueFromJson(JsonObject jsonObject, String jsonKey, List<String> missingValues) { if (jsonObject.has(jsonKey)) { return jsonObject.get(jsonKey).getAsString(); } else { @@ -278,11 +278,11 @@ public class JsonMessageParser { } } - private boolean containsNotificationFields(JsonObject jsonObject) { + private static boolean containsNotificationFields(JsonObject jsonObject) { return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS); } - private Flux<FileReadyMessage> logErrorAndReturnEmptyMessageFlux(String errorMessage) { + private static Flux<FileReadyMessage> logErrorAndReturnEmptyMessageFlux(String errorMessage) { logger.error(errorMessage); return Flux.empty(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java index 2b3a0ef6..475e0224 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java @@ -28,7 +28,7 @@ import java.util.Map; * the key was last used. */ public class PublishedFileCache { - private final Map<Path, Instant> publishedFiles = Collections.synchronizedMap(new HashMap<Path, Instant>()); + private final Map<Path, Instant> publishedFiles = new HashMap<Path, Instant>(); /** * Adds a file to the cache. @@ -36,7 +36,7 @@ public class PublishedFileCache { * @param path the name of the file to add. * @return <code>null</code> if the file is not already in the cache. */ - public Instant put(Path path) { + public synchronized Instant put(Path path) { return publishedFiles.put(path, Instant.now()); } @@ -45,7 +45,7 @@ public class PublishedFileCache { * * @param localFileName name of the file to remove. */ - public void remove(Path localFileName) { + public synchronized void remove(Path localFileName) { publishedFiles.remove(localFileName); } @@ -54,7 +54,7 @@ public class PublishedFileCache { * * @param now the instant will determine which files that will be purged. */ - public void purge(Instant now) { + public synchronized void purge(Instant now) { for (Iterator<Map.Entry<Path, Instant>> it = publishedFiles.entrySet().iterator(); it.hasNext();) { Map.Entry<Path, Instant> pair = it.next(); if (isCachedPublishedFileOutdated(now, pair.getValue())) { @@ -63,11 +63,11 @@ public class PublishedFileCache { } } - public int size() { + public synchronized int size() { return publishedFiles.size(); } - private boolean isCachedPublishedFileOutdated(Instant now, Instant then) { + private static boolean isCachedPublishedFileOutdated(Instant now, Instant then) { final int timeToKeepInfoInSeconds = 60 * 60 * 24; return now.getEpochSecond() - then.getEpochSecond() > timeToKeepInfoInSeconds; } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java index 8d433827..198c1bf1 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java @@ -27,7 +27,9 @@ import java.security.NoSuchAlgorithmException; import java.time.Duration; import java.util.Map; import java.util.concurrent.Future; + import javax.net.ssl.SSLContext; + import org.apache.commons.codec.binary.Base64; import org.apache.http.HttpResponse; import org.apache.http.client.config.RequestConfig; @@ -44,8 +46,6 @@ import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.slf4j.Marker; import org.slf4j.MarkerFactory; -import org.springframework.web.util.DefaultUriBuilderFactory; -import org.springframework.web.util.UriBuilder; /** * Client used to send requests to DataRouter. @@ -139,22 +139,8 @@ public class DmaapProducerHttpClient { request.addHeader("Authorization", "Basic " + base64Creds); } - /** - * Gets a <code>UriBuilder</code> containing the base URI needed talk to DataRouter. Specific parts can then be - * added to the URI by the user. - * - * @return a <code>UriBuilder</code> containing the base URI needed talk to DataRouter. - */ - public UriBuilder getBaseUri() { - return new DefaultUriBuilderFactory().builder() // - .scheme(configuration.dmaapProtocol()) // - .host(configuration.dmaapHostName()) // - .port(configuration.dmaapPortNumber()); - } - private CloseableHttpAsyncClient createWebClient(boolean expectRedirect, Duration requestTimeout, - Map<String, String> contextMap) - throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException { + Map<String, String> contextMap) throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException { SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build(); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java index e50ef580..f1d33454 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java @@ -21,6 +21,7 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; import org.onap.dcaegen2.collectors.datafile.service.DmaapWebClient; import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser; @@ -29,6 +30,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consume import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.reactive.function.client.WebClient; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -43,7 +45,7 @@ public class DMaaPMessageConsumer { private final JsonMessageParser jsonMessageParser; private final DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient; - public DMaaPMessageConsumer(AppConfig datafileAppConfig) { + public DMaaPMessageConsumer(AppConfig datafileAppConfig) throws DatafileTaskException { this.jsonMessageParser = new JsonMessageParser(); this.dmaaPConsumerReactiveHttpClient = createHttpClient(datafileAppConfig); } @@ -69,8 +71,9 @@ public class DMaaPMessageConsumer { return jsonMessageParser.getMessagesFromJson(message); } - private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig) { - DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration(); + private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig) + throws DatafileTaskException { + DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration().toDmaap(); WebClient client = new DmaapWebClient().fromConfiguration(config).build(); return new DMaaPConsumerReactiveHttpClient(config, client); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java index e5dd01e9..1d6baa65 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -25,6 +25,7 @@ import com.google.gson.JsonParser; import java.io.IOException; import java.io.InputStream; +import java.net.MalformedURLException; import java.net.URI; import java.nio.file.Path; import java.time.Duration; @@ -34,6 +35,8 @@ import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpPut; import org.apache.http.entity.ByteArrayEntity; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.JsonSerializer; import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; @@ -46,6 +49,7 @@ import org.slf4j.MDC; import org.springframework.core.io.FileSystemResource; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; +import org.springframework.web.util.DefaultUriBuilderFactory; import reactor.core.publisher.Mono; @@ -58,12 +62,9 @@ import reactor.core.publisher.Mono; public class DataRouterPublisher { private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META"; private static final String CONTENT_TYPE = "application/octet-stream"; - private static final String PUBLISH_TOPIC = "publish"; - private static final String DEFAULT_FEED_ID = "1"; private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class); private final AppConfig datafileAppConfig; - private DmaapProducerHttpClient dmaapProducerReactiveHttpClient; public DataRouterPublisher(AppConfig datafileAppConfig) { this.datafileAppConfig = datafileAppConfig; @@ -80,7 +81,6 @@ public class DataRouterPublisher { public Mono<FilePublishInformation> publishFile(FilePublishInformation publishInfo, long numRetries, Duration firstBackoff) { MDC.setContextMap(publishInfo.getContext()); - dmaapProducerReactiveHttpClient = resolveClient(); return Mono.just(publishInfo) // .cache() // .flatMap(this::publishFile) // @@ -92,13 +92,14 @@ public class DataRouterPublisher { MDC.setContextMap(publishInfo.getContext()); logger.trace("Entering publishFile with {}", publishInfo); try { + DmaapProducerHttpClient dmaapProducerHttpClient = resolveClient(publishInfo.getChangeIdentifier()); HttpPut put = new HttpPut(); prepareHead(publishInfo, put); prepareBody(publishInfo, put); - dmaapProducerReactiveHttpClient.addUserCredentialsToHead(put); + dmaapProducerHttpClient.addUserCredentialsToHead(put); HttpResponse response = - dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext()); + dmaapProducerHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext()); logger.trace("{}", response); return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode())); } catch (Exception e) { @@ -107,11 +108,18 @@ public class DataRouterPublisher { } } - private void prepareHead(FilePublishInformation publishInfo, HttpPut put) { + private void prepareHead(FilePublishInformation publishInfo, HttpPut put) throws DatafileTaskException { + put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE); JsonElement metaData = new JsonParser().parse(JsonSerializer.createJsonBodyForDataRouter(publishInfo)); put.addHeader(X_DMAAP_DR_META, metaData.toString()); - put.setURI(getPublishUri(publishInfo.getName())); + URI uri = new DefaultUriBuilderFactory( + datafileAppConfig.getPublisherConfiguration(publishInfo.getChangeIdentifier()).publishUrl()) // + .builder() // + .pathSegment(publishInfo.getName()) // + .build(); + put.setURI(uri); + MappedDiagnosticContext.appendTraceInfo(put); } @@ -122,14 +130,7 @@ public class DataRouterPublisher { } } - private URI getPublishUri(String fileName) { - return dmaapProducerReactiveHttpClient.getBaseUri() // - .pathSegment(PUBLISH_TOPIC) // - .pathSegment(DEFAULT_FEED_ID) // - .pathSegment(fileName).build(); - } - - private Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation publishInfo) { + private static Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation publishInfo) { MDC.setContextMap(publishInfo.getContext()); if (HttpUtils.isSuccessfulResponseCode(response.value())) { logger.trace("Publish to DR successful!"); @@ -145,11 +146,17 @@ public class DataRouterPublisher { return realResource.getInputStream(); } - DmaapPublisherConfiguration resolveConfiguration() { - return datafileAppConfig.getDmaapPublisherConfiguration(); + PublisherConfiguration resolveConfiguration(String changeIdentifer) throws DatafileTaskException { + return datafileAppConfig.getPublisherConfiguration(changeIdentifer); } - DmaapProducerHttpClient resolveClient() { - return new DmaapProducerHttpClient(resolveConfiguration()); + DmaapProducerHttpClient resolveClient(String changeIdentifier) throws DatafileTaskException { + try { + DmaapPublisherConfiguration cfg = resolveConfiguration(changeIdentifier).toDmaap(); + return new DmaapProducerHttpClient(cfg); + } catch (MalformedURLException e) { + throw new DatafileTaskException("Cannot resolve producer client", e); + } + } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java index 0c62795e..6ddcb541 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -74,12 +74,12 @@ public class FileCollector { return Mono.just(fileData) // .cache() // - .flatMap(fd -> collectFile(fileData, contextMap)) // + .flatMap(fd -> tryCollectFile(fileData, contextMap)) // .retryBackoff(numRetries, firstBackoff) // - .flatMap(this::checkCollectedFile); + .flatMap(FileCollector::checkCollectedFile); } - private Mono<FilePublishInformation> checkCollectedFile(Optional<FilePublishInformation> info) { + private static Mono<FilePublishInformation> checkCollectedFile(Optional<FilePublishInformation> info) { if (info.isPresent()) { return Mono.just(info.get()); } else { @@ -88,7 +88,7 @@ public class FileCollector { } } - private Mono<Optional<FilePublishInformation>> collectFile(FileData fileData, Map<String, String> context) { + private Mono<Optional<FilePublishInformation>> tryCollectFile(FileData fileData, Map<String, String> context) { MDC.setContextMap(context); logger.trace("starting to collectFile {}", fileData.name()); @@ -110,7 +110,7 @@ public class FileCollector { } } catch (Exception throwable) { logger.warn("Failed to close ftp client: {} {}, reason: {}", fileData.sourceName(), fileData.name(), - throwable.toString()); + throwable.toString(), throwable); return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context))); } } @@ -126,7 +126,7 @@ public class FileCollector { } } - private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile, + private static FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile, Map<String, String> context) { String location = fileData.location(); MessageMetaData metaData = fileData.messageMetaData(); @@ -143,6 +143,7 @@ public class FileCollector { .compression(fileData.compression()) // .fileFormatType(fileData.fileFormatType()) // .fileFormatVersion(fileData.fileFormatVersion()) // + .changeIdentifier(fileData.messageMetaData().changeIdentifier()) // .context(context) // .build(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java index e18da248..4d8d679d 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java @@ -21,18 +21,23 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import java.io.InputStream; +import java.net.MalformedURLException; import java.net.URI; +import java.net.URISyntaxException; import java.time.Duration; import java.util.Map; + import org.apache.commons.io.IOUtils; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.utils.URIBuilder; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -44,12 +49,9 @@ import org.slf4j.MDC; * */ public class PublishedChecker { - private static final String FEEDLOG_TOPIC = "feedlog"; - private static final String DEFAULT_FEED_ID = "1"; - private static final Duration WEB_CLIENT_TIMEOUT = Duration.ofSeconds(4); + private static final Duration WEB_CLIENT_TIMEOUT = Duration.ofSeconds(4); private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private final AppConfig appConfig; /** @@ -66,19 +68,24 @@ public class PublishedChecker { * * @param fileName the name of the file used when it is published. * - * @return <code>true</code> if the file has been published before, <code>false</code> otherwise. + * @return <code>true</code> if the file has been published before, <code>false</code> + * otherwise. + * @throws DatafileTaskException if the check fails */ - public boolean isFilePublished(String fileName, Map<String, String> contextMap) { + public boolean isFilePublished(String fileName, String changeIdentifier, Map<String, String> contextMap) + throws DatafileTaskException { MDC.setContextMap(contextMap); - DmaapProducerHttpClient producerClient = resolveClient(); + PublisherConfiguration publisherConfig = resolveConfiguration(changeIdentifier); + + DmaapProducerHttpClient producerClient = resolveClient(publisherConfig); HttpGet getRequest = new HttpGet(); MappedDiagnosticContext.appendTraceInfo(getRequest); - getRequest.setURI(getPublishedQueryUri(fileName, producerClient)); - producerClient.addUserCredentialsToHead(getRequest); - try { + getRequest.setURI(getPublishedQueryUri(fileName, publisherConfig)); + producerClient.addUserCredentialsToHead(getRequest); + HttpResponse response = producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest, WEB_CLIENT_TIMEOUT, contextMap); @@ -95,20 +102,23 @@ public class PublishedChecker { } } - private URI getPublishedQueryUri(String fileName, DmaapProducerHttpClient producerClient) { - return producerClient.getBaseUri() // - .pathSegment(FEEDLOG_TOPIC) // - .pathSegment(DEFAULT_FEED_ID) // - .queryParam("type", "pub") // - .queryParam("filename", fileName) // + private static URI getPublishedQueryUri(String fileName, PublisherConfiguration config) throws URISyntaxException { + return new URIBuilder(config.logUrl()) // + .addParameter("type", "pub") // + .addParameter("filename", fileName) // .build(); } - protected DmaapPublisherConfiguration resolveConfiguration() { - return appConfig.getDmaapPublisherConfiguration(); + protected PublisherConfiguration resolveConfiguration(String changeIdentifier) throws DatafileTaskException { + return appConfig.getPublisherConfiguration(changeIdentifier); } - protected DmaapProducerHttpClient resolveClient() { - return new DmaapProducerHttpClient(resolveConfiguration()); + protected DmaapProducerHttpClient resolveClient(PublisherConfiguration publisherConfig) + throws DatafileTaskException { + try { + return new DmaapProducerHttpClient(publisherConfig.toDmaap()); + } catch (MalformedURLException e) { + throw new DatafileTaskException("Cannot create published checker client", e); + } } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index b5fa0c24..0f220fde 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; @@ -86,13 +87,16 @@ public class ScheduledTasks { threadPoolQueueSize.get()); return; } + if (this.applicationConfiguration.getDmaapConsumerConfiguration() == null) { + logger.warn("No configuration loaded, skipping polling for messages"); + return; + } currentNumberOfSubscriptions.incrementAndGet(); Map<String, String> context = MappedDiagnosticContext.initializeTraceContext(); logger.trace("Execution of tasks was registered"); - applicationConfiguration.loadConfigurationFromFile(); createMainTask(context) // - .subscribe(this::onSuccess, // + .subscribe(ScheduledTasks::onSuccess, // throwable -> { onError(throwable, context); currentNumberOfSubscriptions.decrementAndGet(); @@ -113,9 +117,10 @@ public class ScheduledTasks { .runOn(scheduler) // .doOnNext(fileReadyMessage -> threadPoolQueueSize.decrementAndGet()) // .flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) // - .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) // .flatMap(fileData -> createMdcContext(fileData, context)) // + .filter(this::isFeedConfigured) // .filter(this::shouldBePublished) // + .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) // .flatMap(this::fetchFile, false, 1, 1) // .flatMap(this::publishToDataRouter, false, 1, 1) // .doOnNext(publishInfo -> deleteFile(publishInfo.getInternalLocation(), publishInfo.getContext())) // @@ -124,13 +129,13 @@ public class ScheduledTasks { } private class FileDataWithContext { - FileDataWithContext(FileData fileData, Map<String, String> context) { + public final FileData fileData; + public final Map<String, String> context; + + public FileDataWithContext(FileData fileData, Map<String, String> context) { this.fileData = fileData; this.context = context; } - - final FileData fileData; - final Map<String, String> context; } /** @@ -160,7 +165,7 @@ public class ScheduledTasks { return this.threadPoolQueueSize.get(); } - protected DMaaPMessageConsumer createConsumerTask() { + protected DMaaPMessageConsumer createConsumerTask() throws DatafileTaskException { return new DMaaPMessageConsumer(this.applicationConfiguration); } @@ -172,17 +177,17 @@ public class ScheduledTasks { return new DataRouterPublisher(applicationConfiguration); } - private void onComplete(Map<String, String> contextMap) { + private static void onComplete(Map<String, String> contextMap) { MDC.setContextMap(contextMap); logger.trace("Datafile tasks have been completed"); } - private synchronized void onSuccess(FilePublishInformation publishInfo) { + private static synchronized void onSuccess(FilePublishInformation publishInfo) { MDC.setContextMap(publishInfo.getContext()); logger.info("Datafile file published {}", publishInfo.getInternalLocation()); } - private void onError(Throwable throwable, Map<String, String> context) { + private static void onError(Throwable throwable, Map<String, String> context) { MDC.setContextMap(context); logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable.toString()); } @@ -194,17 +199,30 @@ public class ScheduledTasks { return Mono.just(pair); } + private boolean isFeedConfigured(FileDataWithContext fileData) { + if (applicationConfiguration.isFeedConfigured(fileData.fileData.messageMetaData().changeIdentifier())) { + return true; + } else { + logger.info("No feed is configured for: {}, file ignored: {}", + fileData.fileData.messageMetaData().changeIdentifier(), fileData.fileData.name()); + return false; + } + } + private boolean shouldBePublished(FileDataWithContext fileData) { - boolean result = false; Path localFilePath = fileData.fileData.getLocalFilePath(); if (publishedFilesCache.put(localFilePath) == null) { - result = !createPublishedChecker().isFilePublished(fileData.fileData.name(), fileData.context); - } - if (!result) { - currentNumberOfTasks.decrementAndGet(); + try { + boolean result = !createPublishedChecker().isFilePublished(fileData.fileData.name(), + fileData.fileData.messageMetaData().changeIdentifier(), fileData.context); + return result; + } catch (DatafileTaskException e) { + logger.error("Cannot check if a file {} is published", fileData.fileData.name(), e); + return true; // Publish it then + } + } else { + return false; } - - return result; } private Mono<FilePublishInformation> fetchFile(FileDataWithContext fileData) { @@ -248,13 +266,19 @@ public class ScheduledTasks { */ private Flux<FileReadyMessage> fetchMoreFileReadyMessages() { logger.info( - "Consuming new file ready messages, current number of tasks: {}, published files: {}, number of subscrptions: {}", + "Consuming new file ready messages, current number of tasks: {}, published files: {}, " + + "number of subscriptions: {}", getCurrentNumberOfTasks(), publishedFilesCache.size(), this.currentNumberOfSubscriptions.get()); Map<String, String> context = MDC.getCopyOfContextMap(); - return createConsumerTask() // - .getMessageRouterResponse() // - .onErrorResume(exception -> handleConsumeMessageFailure(exception, context)); + try { + return createConsumerTask() // + .getMessageRouterResponse() // + .onErrorResume(exception -> handleConsumeMessageFailure(exception, context)); + } catch (Exception e) { + logger.error("Could not create message consumer task", e); + return Flux.empty(); + } } private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> context) { @@ -264,7 +288,7 @@ public class ScheduledTasks { return Flux.empty(); } - private void deleteFile(Path localFile, Map<String, String> context) { + private static void deleteFile(Path localFile, Map<String, String> context) { MDC.setContextMap(context); logger.trace("Deleting file: {}", localFile); try { |