diff options
author | PatrikBuhr <patrik.buhr@est.tech> | 2019-06-14 11:42:38 +0000 |
---|---|---|
committer | PatrikBuhr <patrik.buhr@est.tech> | 2019-06-14 11:42:38 +0000 |
commit | 21f0997ab8b1d16b05a0dd5929ff0491ea47c466 (patch) | |
tree | 705d1e4c09899f1e8ed72f50be2bf0b0a2c2e7e5 /datafile-app-server/src/main | |
parent | dafd553cf1694585b35fd7132c6bafdef1e98ed6 (diff) |
Code formatting with maven
This commit only contains code formastting changes done by command:
mvn formatter:format spotless:apply process-sources
Change-Id: I0fb9d166ad5d9171cdeee9f26b6d353bca74069c
Issue-ID: DCAEGEN2-1538
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Diffstat (limited to 'datafile-app-server/src/main')
33 files changed, 313 insertions, 297 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java index cc5d12b9..55f4fe73 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java @@ -32,7 +32,7 @@ import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler; * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ -@SpringBootApplication(exclude = { JacksonAutoConfiguration.class }) +@SpringBootApplication(exclude = {JacksonAutoConfiguration.class}) @EnableScheduling public class MainApp { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java index b66be163..8e15deb7 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java @@ -22,6 +22,7 @@ import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonSyntaxException; import com.google.gson.TypeAdapterFactory; + import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; @@ -31,8 +32,10 @@ import java.time.Duration; import java.util.Map; import java.util.Properties; import java.util.ServiceLoader; + import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; + import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties; @@ -46,6 +49,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.ComponentScan; import org.springframework.stereotype.Component; + import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -77,7 +81,7 @@ public class AppConfig { @Autowired public synchronized void setCloudConfigurationProvider( - CloudConfigurationProvider reactiveCloudConfigurationProvider) { + CloudConfigurationProvider reactiveCloudConfigurationProvider) { this.cloudConfigurationProvider = reactiveCloudConfigurationProvider; } @@ -94,10 +98,10 @@ public class AppConfig { loadConfigurationFromFile(); refreshConfigTask = Flux.interval(Duration.ZERO, Duration.ofMinutes(5)) - .flatMap(count -> createRefreshConfigurationTask(count, context)) - .subscribe(e -> logger.info("Refreshed configuration data"), - throwable -> logger.error("Configuration refresh terminated due to exception", throwable), - () -> logger.error("Configuration refresh terminated")); + .flatMap(count -> createRefreshConfigurationTask(count, context)) + .subscribe(e -> logger.info("Refreshed configuration data"), + throwable -> logger.error("Configuration refresh terminated due to exception", throwable), + () -> logger.error("Configuration refresh terminated")); } public void stop() { @@ -116,7 +120,7 @@ public class AppConfig { } public synchronized PublisherConfiguration getPublisherConfiguration(String changeIdentifier) - throws DatafileTaskException { + throws DatafileTaskException { if (publishingConfiguration == null) { throw new DatafileTaskException("No PublishingConfiguration loaded, changeIdentifier: " + changeIdentifier); @@ -124,7 +128,7 @@ public class AppConfig { PublisherConfiguration cfg = publishingConfiguration.get(changeIdentifier); if (cfg == null) { throw new DatafileTaskException( - "Cannot find getPublishingConfiguration for changeIdentifier: " + changeIdentifier); + "Cannot find getPublishingConfiguration for changeIdentifier: " + changeIdentifier); } return cfg; } @@ -135,14 +139,14 @@ public class AppConfig { Flux<AppConfig> createRefreshConfigurationTask(Long counter, Map<String, String> context) { return Flux.just(counter) // - .doOnNext(cnt -> logger.debug("Refresh config {}", cnt)) // - .flatMap(cnt -> readEnvironmentVariables(systemEnvironment, context)) // - .flatMap(this::fetchConfiguration); + .doOnNext(cnt -> logger.debug("Refresh config {}", cnt)) // + .flatMap(cnt -> readEnvironmentVariables(systemEnvironment, context)) // + .flatMap(this::fetchConfiguration); } Mono<EnvProperties> readEnvironmentVariables(Properties systemEnvironment, Map<String, String> context) { return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context) - .onErrorResume(AppConfig::onErrorResume); + .onErrorResume(AppConfig::onErrorResume); } private static <R> Mono<R> onErrorResume(Throwable trowable) { @@ -152,21 +156,21 @@ public class AppConfig { private Mono<AppConfig> fetchConfiguration(EnvProperties env) { Mono<JsonObject> serviceCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(env) // - .onErrorResume(AppConfig::onErrorResume); + .onErrorResume(AppConfig::onErrorResume); // Note, have to use this callForServiceConfigurationReactive with EnvProperties, since the // other ones does not work EnvProperties dmaapEnv = ImmutableEnvProperties.builder() // - .consulHost(env.consulHost()) // - .consulPort(env.consulPort()) // - .cbsName(env.cbsName()) // - .appName(env.appName() + ":dmaap") // - .build(); // + .consulHost(env.consulHost()) // + .consulPort(env.consulPort()) // + .cbsName(env.cbsName()) // + .appName(env.appName() + ":dmaap") // + .build(); // Mono<JsonObject> dmaapCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(dmaapEnv) - .onErrorResume(t -> Mono.just(new JsonObject())); + .onErrorResume(t -> Mono.just(new JsonObject())); return serviceCfg.zipWith(dmaapCfg, this::parseCloudConfig) // - .onErrorResume(AppConfig::onErrorResume); + .onErrorResume(AppConfig::onErrorResume); } /** @@ -181,7 +185,7 @@ public class AppConfig { try { CloudConfigParser parser = new CloudConfigParser(serviceConfigRootObject, dmaapConfigRootObject); setConfiguration(parser.getDmaapConsumerConfig(), parser.getDmaapPublisherConfig(), - parser.getFtpesConfig()); + parser.getFtpesConfig()); } catch (DatafileTaskException e) { logger.error("Could not parse configuration {}", e.toString(), e); } @@ -208,11 +212,11 @@ public class AppConfig { } private synchronized void setConfiguration(ConsumerConfiguration consumerConfiguration, - Map<String, PublisherConfiguration> publisherConfiguration, FtpesConfig ftpesConfig) { + Map<String, PublisherConfiguration> publisherConfiguration, FtpesConfig ftpesConfig) { if (consumerConfiguration == null || publisherConfiguration == null || ftpesConfig == null) { logger.error( - "Problem with configuration consumerConfiguration: {}, publisherConfiguration: {}, ftpesConfig: {}", - consumerConfiguration, publisherConfiguration, ftpesConfig); + "Problem with configuration consumerConfiguration: {}, publisherConfiguration: {}, ftpesConfig: {}", + consumerConfiguration, publisherConfiguration, ftpesConfig); } else { this.dmaapConsumerConfiguration = consumerConfiguration; this.publishingConfiguration = publisherConfiguration; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java index 3103af49..d25d7db1 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java @@ -21,13 +21,14 @@ package org.onap.dcaegen2.collectors.datafile.configuration; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; + import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; /** * Parses the cloud configuration. @@ -52,7 +53,7 @@ public class CloudConfigParser { public Map<String, PublisherConfiguration> getDmaapPublisherConfig() throws DatafileTaskException { Iterator<JsonElement> producerCfgs = - toArray(serviceConfigurationRoot.get("dmaap.dmaapProducerConfiguration")).iterator(); + toArray(serviceConfigurationRoot.get("dmaap.dmaapProducerConfiguration")).iterator(); Map<String, PublisherConfiguration> result = new HashMap<>(); @@ -62,19 +63,18 @@ public class CloudConfigParser { JsonObject feedConfig = getFeedConfig(feedName); PublisherConfiguration cfg = ImmutablePublisherConfiguration.builder() // - .publishUrl(getAsString(feedConfig, "publish_url")) // - .passWord(getAsString(feedConfig, "password")) // - .userName(getAsString(feedConfig, "username")) // - .trustStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PATH)) // - .trustStorePasswordPath( - getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) // - .keyStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PATH)) // - .keyStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) // - .enableDmaapCertAuth( - get(serviceConfigurationRoot, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) // - .changeIdentifier(getAsString(producerCfg, "changeIdentifier")) // - .logUrl(getAsString(feedConfig, "log_url")) // - .build(); + .publishUrl(getAsString(feedConfig, "publish_url")) // + .passWord(getAsString(feedConfig, "password")) // + .userName(getAsString(feedConfig, "username")) // + .trustStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PATH)) // + .trustStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) // + .keyStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PATH)) // + .keyStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) // + .enableDmaapCertAuth( + get(serviceConfigurationRoot, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) // + .changeIdentifier(getAsString(producerCfg, "changeIdentifier")) // + .logUrl(getAsString(feedConfig, "log_url")) // + .build(); result.put(cfg.changeIdentifier(), cfg); } @@ -92,22 +92,21 @@ public class CloudConfigParser { String topicUrl = getAsString(dmaapInfo, "topic_url"); return ImmutableConsumerConfiguration.builder().topicUrl(topicUrl) - .trustStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PATH)) - .trustStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) - .keyStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PATH)) - .keyStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) - .enableDmaapCertAuth( - get(serviceConfigurationRoot, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) // - .build(); + .trustStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PATH)) + .trustStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) + .keyStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PATH)) + .keyStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) + .enableDmaapCertAuth(get(serviceConfigurationRoot, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) // + .build(); } public FtpesConfig getFtpesConfig() throws DatafileTaskException { return new ImmutableFtpesConfig.Builder() // - .keyCert(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyCert")) - .keyPassword(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyPassword")) - .trustedCa(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.trustedCa")) - .trustedCaPassword(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.trustedCaPassword")) // - .build(); + .keyCert(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyCert")) + .keyPassword(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyPassword")) + .trustedCa(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.trustedCa")) + .trustedCaPassword(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.trustedCaPassword")) // + .build(); } private static JsonElement get(JsonObject obj, String memberName) throws DatafileTaskException { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java index bd8f0c3c..1fd24e98 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java @@ -18,6 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.configuration; import java.net.MalformedURLException; import java.net.URL; + import org.immutables.gson.Gson; import org.immutables.value.Value; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; @@ -55,23 +56,23 @@ public abstract class ConsumerConfiguration { DmaapConsumerUrlPath path = parseDmaapUrlPath(urlPath); return new ImmutableDmaapConsumerConfiguration.Builder() // - .dmaapContentType("application/json") // - .dmaapPortNumber(url.getPort()) // - .dmaapHostName(url.getHost()) // - .dmaapTopicName(path.dmaapTopicName) // - .dmaapProtocol(url.getProtocol()) // - .dmaapUserName(userName) // - .dmaapUserPassword(passwd) // - .trustStorePath(this.trustStorePath()) // - .trustStorePasswordPath(this.trustStorePasswordPath()) // - .keyStorePath(this.keyStorePath()) // - .keyStorePasswordPath(this.keyStorePasswordPath()) // - .enableDmaapCertAuth(this.enableDmaapCertAuth()) // - .consumerId(path.consumerId) // - .consumerGroup(path.consumerGroup) // - .timeoutMs(-1) // - .messageLimit(-1) // - .build(); + .dmaapContentType("application/json") // + .dmaapPortNumber(url.getPort()) // + .dmaapHostName(url.getHost()) // + .dmaapTopicName(path.dmaapTopicName) // + .dmaapProtocol(url.getProtocol()) // + .dmaapUserName(userName) // + .dmaapUserPassword(passwd) // + .trustStorePath(this.trustStorePath()) // + .trustStorePasswordPath(this.trustStorePasswordPath()) // + .keyStorePath(this.keyStorePath()) // + .keyStorePasswordPath(this.keyStorePasswordPath()) // + .enableDmaapCertAuth(this.enableDmaapCertAuth()) // + .consumerId(path.consumerId) // + .consumerGroup(path.consumerGroup) // + .timeoutMs(-1) // + .messageLimit(-1) // + .build(); } catch (MalformedURLException e) { throw new DatafileTaskException("Could not parse the URL", e); } @@ -90,12 +91,14 @@ public abstract class ConsumerConfiguration { } private DmaapConsumerUrlPath parseDmaapUrlPath(String urlPath) throws DatafileTaskException { - String[] tokens = urlPath.split("/"); // UrlPath: /events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12 + String[] tokens = urlPath.split("/"); // UrlPath: + // /events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12 if (tokens.length != 5) { throw new DatafileTaskException("The path has incorrect syntax: " + urlPath); } - final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // e.g. /events/unauthenticated.VES_NOTIFICATION_OUTPUT + final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // e.g. + // /events/unauthenticated.VES_NOTIFICATION_OUTPUT final String consumerGroup = tokens[3]; // ex. OpenDcae-c12 final String consumerId = tokens[4]; // ex. C12 return new DmaapConsumerUrlPath(dmaapTopicName, consumerGroup, consumerId); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java index 62af92a8..f3c915b2 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java @@ -26,7 +26,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuratio import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; - import reactor.core.publisher.Mono; /** @@ -39,7 +38,8 @@ class EnvironmentProcessor { private static final int DEFAULT_CONSUL_PORT = 8500; private static final Logger logger = LoggerFactory.getLogger(EnvironmentProcessor.class); - private EnvironmentProcessor() {} + private EnvironmentProcessor() { + } static Mono<EnvProperties> readEnvironmentVariables(Properties systemEnvironment, Map<String, String> contextMap) { MDC.setContextMap(contextMap); @@ -47,11 +47,11 @@ class EnvironmentProcessor { EnvProperties envProperties; try { envProperties = ImmutableEnvProperties.builder() // - .consulHost(getConsulHost(systemEnvironment)) // - .consulPort(getConsultPort(systemEnvironment)) // - .cbsName(getConfigBindingService(systemEnvironment)) // - .appName(getService(systemEnvironment)) // - .build(); + .consulHost(getConsulHost(systemEnvironment)) // + .consulPort(getConsultPort(systemEnvironment)) // + .cbsName(getConfigBindingService(systemEnvironment)) // + .appName(getService(systemEnvironment)) // + .build(); } catch (EnvironmentLoaderException e) { return Mono.error(e); } @@ -61,27 +61,27 @@ class EnvironmentProcessor { private static String getConsulHost(Properties systemEnvironments) throws EnvironmentLoaderException { return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_HOST")) - .orElseThrow(() -> new EnvironmentLoaderException("$CONSUL_HOST environment has not been defined")); + .orElseThrow(() -> new EnvironmentLoaderException("$CONSUL_HOST environment has not been defined")); } private static Integer getConsultPort(Properties systemEnvironments) { return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_PORT")) // - .map(Integer::valueOf) // - .orElseGet(EnvironmentProcessor::getDefaultPortOfConsul); + .map(Integer::valueOf) // + .orElseGet(EnvironmentProcessor::getDefaultPortOfConsul); } private static String getConfigBindingService(Properties systemEnvironments) throws EnvironmentLoaderException { return Optional.ofNullable(systemEnvironments.getProperty("CONFIG_BINDING_SERVICE")) // - .orElseThrow(() -> new EnvironmentLoaderException( - "$CONFIG_BINDING_SERVICE environment has not been defined")); + .orElseThrow( + () -> new EnvironmentLoaderException("$CONFIG_BINDING_SERVICE environment has not been defined")); } private static String getService(Properties systemEnvironments) throws EnvironmentLoaderException { return Optional - .ofNullable(Optional.ofNullable(systemEnvironments.getProperty("HOSTNAME")) - .orElse(systemEnvironments.getProperty("SERVICE_NAME"))) - .orElseThrow(() -> new EnvironmentLoaderException( - "Neither $HOSTNAME/$SERVICE_NAME have not been defined as system environment")); + .ofNullable(Optional.ofNullable(systemEnvironments.getProperty("HOSTNAME")) + .orElse(systemEnvironments.getProperty("SERVICE_NAME"))) + .orElseThrow(() -> new EnvironmentLoaderException( + "Neither $HOSTNAME/$SERVICE_NAME have not been defined as system environment")); } private static Integer getDefaultPortOfConsul() { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java index 844699eb..e12365e4 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java @@ -26,7 +26,6 @@ import org.immutables.gson.Gson; import org.immutables.value.Value; import org.springframework.stereotype.Component; - @Component @Value.Immutable @Value.Style(builder = "new", redactedMask = "####") diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java index 5576ed26..3c3a7625 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java @@ -24,7 +24,6 @@ import org.immutables.value.Value; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; - @Value.Immutable @Value.Style(redactedMask = "####") @Gson.TypeAdapters @@ -56,19 +55,19 @@ public interface PublisherConfiguration { String urlPath = url.getPath(); return new ImmutableDmaapPublisherConfiguration.Builder() // - .dmaapContentType("application/octet-stream") // - .dmaapPortNumber(url.getPort()) // - .dmaapHostName(url.getHost()) // - .dmaapTopicName(urlPath) // - .dmaapProtocol(url.getProtocol()) // - .dmaapUserName(this.userName()) // - .dmaapUserPassword(this.passWord()) // - .trustStorePath(this.trustStorePath()) // - .trustStorePasswordPath(this.trustStorePasswordPath()) // - .keyStorePath(this.keyStorePath()) // - .keyStorePasswordPath(this.keyStorePasswordPath()) // - .enableDmaapCertAuth(this.enableDmaapCertAuth()) // - .build(); + .dmaapContentType("application/octet-stream") // + .dmaapPortNumber(url.getPort()) // + .dmaapHostName(url.getHost()) // + .dmaapTopicName(urlPath) // + .dmaapProtocol(url.getProtocol()) // + .dmaapUserName(this.userName()) // + .dmaapUserPassword(this.passWord()) // + .trustStorePath(this.trustStorePath()) // + .trustStorePasswordPath(this.trustStorePasswordPath()) // + .keyStorePath(this.keyStorePath()) // + .keyStorePasswordPath(this.keyStorePasswordPath()) // + .enableDmaapCertAuth(this.enableDmaapCertAuth()) // + .build(); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java index e0362373..3a0c559e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java @@ -17,6 +17,7 @@ package org.onap.dcaegen2.collectors.datafile.configuration; import io.swagger.annotations.ApiOperation; + import java.time.Duration; import java.time.Instant; import java.util.ArrayList; @@ -24,7 +25,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledFuture; + import javax.annotation.PostConstruct; + import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks; import org.slf4j.Logger; @@ -66,8 +69,7 @@ public class SchedulerConfig { * @param configuration The DFC configuration. */ @Autowired - public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTasks, - AppConfig configuration) { + public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTasks, AppConfig configuration) { this.taskScheduler = taskScheduler; this.scheduledTask = scheduledTasks; this.configuration = configuration; @@ -103,10 +105,9 @@ public class SchedulerConfig { if (scheduledFutureList.isEmpty()) { scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(scheduledTask::executeDatafileMainTask, - SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS)); - scheduledFutureList - .add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()), - SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE)); + SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS)); + scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay( + () -> scheduledTask.purgeCachedInformation(Instant.now()), SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE)); return true; } else { return false; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java index 5a78261a..1990efb8 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java @@ -19,11 +19,13 @@ package org.onap.dcaegen2.collectors.datafile.configuration; import com.google.common.base.Predicates; + import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry; import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport; + import springfox.documentation.builders.ApiInfoBuilder; import springfox.documentation.builders.PathSelectors; import springfox.documentation.builders.RequestHandlerSelectors; @@ -32,7 +34,6 @@ import springfox.documentation.spi.DocumentationType; import springfox.documentation.spring.web.plugins.Docket; import springfox.documentation.swagger2.annotations.EnableSwagger2; - @EnableSwagger2 @Configuration @Profile("prod") @@ -53,29 +54,29 @@ public class SwaggerConfig extends WebMvcConfigurationSupport { @Bean public Docket api() { return new Docket(DocumentationType.SWAGGER_2) // - .apiInfo(apiInfo()) // - .select() // - .apis(RequestHandlerSelectors.any()) // - .paths(PathSelectors.any()) // - .paths(Predicates.not(PathSelectors.regex("/error"))) // - // this endpoint is not implemented, but was visible for Swagger - .build(); + .apiInfo(apiInfo()) // + .select() // + .apis(RequestHandlerSelectors.any()) // + .paths(PathSelectors.any()) // + .paths(Predicates.not(PathSelectors.regex("/error"))) // + // this endpoint is not implemented, but was visible for Swagger + .build(); } private static ApiInfo apiInfo() { return new ApiInfoBuilder() // - .title(API_TITLE) // - .description(DESCRIPTION) // - .version(VERSION) // - .build(); + .title(API_TITLE) // + .description(DESCRIPTION) // + .version(VERSION) // + .build(); } @Override protected void addResourceHandlers(ResourceHandlerRegistry registry) { registry.addResourceHandler(SWAGGER_UI) // - .addResourceLocations(RESOURCES_PATH); + .addResourceLocations(RESOURCES_PATH); registry.addResourceHandler(WEBJARS) // - .addResourceLocations(WEBJARS_PATH); + .addResourceLocations(WEBJARS_PATH); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java index 791f0cf1..16cd05de 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java @@ -20,6 +20,7 @@ package org.onap.dcaegen2.collectors.datafile.controllers; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; + import org.onap.dcaegen2.collectors.datafile.configuration.SchedulerConfig; import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; import org.slf4j.Logger; @@ -71,7 +72,7 @@ public class ScheduleController { public Mono<ResponseEntity<String>> startTasks() { return Mono.fromSupplier(schedulerConfig::tryToStartTask) // - .map(ScheduleController::createStartTaskResponse); + .map(ScheduleController::createStartTaskResponse); } /** @@ -84,7 +85,7 @@ public class ScheduleController { public Mono<ResponseEntity<String>> stopTask(@RequestHeader HttpHeaders headers) { MappedDiagnosticContext.initializeTraceContext(headers); logger.info(MappedDiagnosticContext.ENTRY, "Stop request"); - Mono<ResponseEntity<String>> response = schedulerConfig.getResponseFromCancellationOfTasks(); + Mono<ResponseEntity<String>> response = schedulerConfig.getResponseFromCancellationOfTasks(); logger.info(MappedDiagnosticContext.EXIT, "Stop request"); return response; } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusController.java index 40f9d99b..5b72df1a 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusController.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusController.java @@ -19,12 +19,12 @@ package org.onap.dcaegen2.collectors.datafile.controllers; import static org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext.ENTRY; import static org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext.EXIT; -import org.onap.dcaegen2.collectors.datafile.model.Counters; - import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; + +import org.onap.dcaegen2.collectors.datafile.model.Counters; import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks; import org.slf4j.Logger; @@ -61,7 +61,8 @@ public class StatusController { */ @GetMapping("/heartbeat") @ApiOperation(value = "Returns liveness of DATAFILE service") - @ApiResponses(value = { // + @ApiResponses( + value = { // @ApiResponse(code = 200, message = "DATAFILE service is living"), @ApiResponse(code = 401, message = "You are not authorized to view the resource"), @ApiResponse(code = 403, message = "Accessing the resource you were trying to reach is forbidden"), @@ -85,7 +86,8 @@ public class StatusController { */ @GetMapping("/status") @ApiOperation(value = "Returns status and statistics of DATAFILE service") - @ApiResponses(value = { // + @ApiResponses( + value = { // @ApiResponse(code = 200, message = "DATAFILE service is living"), @ApiResponse(code = 401, message = "You are not authorized to view the resource"), @ApiResponse(code = 403, message = "Accessing the resource you were trying to reach is forbidden"), @@ -100,6 +102,4 @@ public class StatusController { return response; } - - } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/NonRetryableDatafileTaskException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/NonRetryableDatafileTaskException.java index a4bdd667..5c2a0d2f 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/NonRetryableDatafileTaskException.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/NonRetryableDatafileTaskException.java @@ -18,7 +18,6 @@ package org.onap.dcaegen2.collectors.datafile.exceptions; - public class NonRetryableDatafileTaskException extends DatafileTaskException { private static final long serialVersionUID = 1L; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java index c35a5a1d..d74b10a2 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java @@ -17,6 +17,7 @@ package org.onap.dcaegen2.collectors.datafile.ftp; import java.nio.file.Path; + import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; /** @@ -24,7 +25,7 @@ import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; * * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ -public interface FileCollectClient extends AutoCloseable { +public interface FileCollectClient extends AutoCloseable { public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException; public void open() throws DatafileTaskException; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java index 18288603..2d126ff8 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java @@ -72,7 +72,7 @@ public class FtpsClient implements FileCollectClient { * @param trustedCaPassword password for the PNF's trusted keystore. */ public FtpsClient(FileServerData fileServerData, String keyCertPath, String keyCertPassword, Path trustedCaPath, - String trustedCaPassword) { + String trustedCaPassword) { this.fileServerData = fileServerData; this.keyCertPath = keyCertPath; this.keyCertPassword = keyCertPassword; @@ -120,7 +120,8 @@ public class FtpsClient implements FileCollectClient { try (OutputStream output = createOutputStream(localFileName)) { logger.trace("begin to retrieve from xNF."); if (!realFtpsClient.retrieveFile(remoteFileName, output)) { - throw new NonRetryableDatafileTaskException("Could not retrieve file. No retry attempts will be done, file :" + remoteFileName); + throw new NonRetryableDatafileTaskException( + "Could not retrieve file. No retry attempts will be done, file :" + remoteFileName); } } catch (IOException e) { throw new DatafileTaskException("Could not fetch file: " + e, e); @@ -151,14 +152,14 @@ public class FtpsClient implements FileCollectClient { realFtpsClient.setBufferSize(1024 * 1024); } else { throw new DatafileTaskException("Unable to connect to xNF. " + fileServerData.serverAddress() - + " xNF reply code: " + realFtpsClient.getReplyCode()); + + " xNF reply code: " + realFtpsClient.getReplyCode()); } logger.trace("setUpConnection successfully!"); } private TrustManager createTrustManager(Path trustedCaPath, String trustedCaPassword) - throws IOException, KeyStoreException, NoSuchAlgorithmException, CertificateException { + throws IOException, KeyStoreException, NoSuchAlgorithmException, CertificateException { logger.trace("Creating trust manager from file: {}", trustedCaPath); try (InputStream fis = createInputStream(trustedCaPath)) { KeyStore keyStore = KeyStore.getInstance("JKS"); @@ -185,7 +186,7 @@ public class FtpsClient implements FileCollectClient { } protected TrustManager getTrustManager(Path trustedCaPath, String trustedCaPassword) - throws KeyStoreException, NoSuchAlgorithmException, IOException, CertificateException { + throws KeyStoreException, NoSuchAlgorithmException, IOException, CertificateException { synchronized (FtpsClient.class) { if (theTrustManager == null) { theTrustManager = createTrustManager(trustedCaPath, trustedCaPassword); @@ -195,7 +196,7 @@ public class FtpsClient implements FileCollectClient { } protected KeyManager createKeyManager(String keyCertPath, String keyCertPassword) - throws IOException, GeneralSecurityException { + throws IOException, GeneralSecurityException { return KeyManagerUtils.createClientKeyManager(new File(keyCertPath), keyCertPassword); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java index 585dd775..e5ca9351 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java @@ -30,8 +30,7 @@ public enum Scheme { FTPS, SFTP; public static final String DFC_DOES_NOT_SUPPORT_PROTOCOL_ERROR_MSG = "DFC does not support protocol "; - public static final String SUPPORTED_PROTOCOLS_ERROR_MESSAGE = - ". Supported protocols are FTPES, FTPS, and SFTP"; + public static final String SUPPORTED_PROTOCOLS_ERROR_MESSAGE = ". Supported protocols are FTPES, FTPS, and SFTP"; /** * Get a <code>Scheme</code> from a string. @@ -47,8 +46,8 @@ public enum Scheme { } else if ("SFTP".equalsIgnoreCase(schemeString)) { result = Scheme.SFTP; } else { - throw new DatafileTaskException(DFC_DOES_NOT_SUPPORT_PROTOCOL_ERROR_MSG + schemeString - + SUPPORTED_PROTOCOLS_ERROR_MESSAGE); + throw new DatafileTaskException( + DFC_DOES_NOT_SUPPORT_PROTOCOL_ERROR_MSG + schemeString + SUPPORTED_PROTOCOLS_ERROR_MESSAGE); } return result; } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java index f5d10d34..e11cd76b 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java @@ -22,8 +22,10 @@ import com.jcraft.jsch.JSch; import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; import com.jcraft.jsch.SftpException; + import java.nio.file.Path; import java.util.Optional; + import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException; import org.slf4j.Logger; @@ -57,12 +59,12 @@ public class SftpClient implements FileCollectClient { logger.trace("File {} Download Successfull from xNF", localFile.getFileName()); } catch (SftpException e) { boolean retry = e.id != ChannelSftp.SSH_FX_NO_SUCH_FILE && e.id != ChannelSftp.SSH_FX_PERMISSION_DENIED - && e.id != ChannelSftp.SSH_FX_OP_UNSUPPORTED; + && e.id != ChannelSftp.SSH_FX_OP_UNSUPPORTED; if (retry) { throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e); } else { throw new NonRetryableDatafileTaskException( - "Unable to get file from xNF. No retry attempts will be done. Data: " + fileServerData, e); + "Unable to get file from xNF. No retry attempts will be done. Data: " + fileServerData, e); } } @@ -95,7 +97,7 @@ public class SftpClient implements FileCollectClient { throw new DatafileTaskException("Could not open Sftp client. " + e); } else { throw new NonRetryableDatafileTaskException( - "Could not open Sftp client, no retry attempts will be done " + e); + "Could not open Sftp client, no retry attempts will be done " + e); } } } @@ -107,8 +109,8 @@ public class SftpClient implements FileCollectClient { private Session setUpSession(FileServerData fileServerData) throws JSchException { JSch jsch = createJsch(); - Session newSession = jsch.getSession(fileServerData.userId(), fileServerData.serverAddress(), - getPort(fileServerData.port())); + Session newSession = + jsch.getSession(fileServerData.userId(), fileServerData.serverAddress(), getPort(fileServerData.port())); newSession.setConfig("StrictHostKeyChecking", "no"); newSession.setPassword(fileServerData.password()); newSession.connect(); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java index f6f93d49..b0037271 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java @@ -22,6 +22,7 @@ package org.onap.dcaegen2.collectors.datafile.http; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLContext; + import org.apache.http.client.RedirectStrategy; import org.apache.http.client.config.RequestConfig; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java index 5efbe37a..4c42284e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java @@ -23,7 +23,6 @@ package org.onap.dcaegen2.collectors.datafile.model; import java.time.Instant; import java.util.concurrent.atomic.AtomicInteger; - /** * * Various counters that can be shown via a REST API. diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java index 36aae949..8cafd0c4 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java @@ -108,9 +108,9 @@ public abstract class FileData { URI uri = URI.create(location()); Optional<String[]> userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo()); ImmutableFileServerData.Builder builder = ImmutableFileServerData.builder() // - .serverAddress(uri.getHost()) // - .userId(userInfo.isPresent() ? userInfo.get()[0] : "") // - .password(userInfo.isPresent() ? userInfo.get()[1] : ""); + .serverAddress(uri.getHost()) // + .userId(userInfo.isPresent() ? userInfo.get()[0] : "") // + .password(userInfo.isPresent() ? userInfo.get()[1] : ""); if (uri.getPort() > 0) { builder.port(uri.getPort()); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java index 5b8c015e..34f2ac0b 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java @@ -34,7 +34,7 @@ import org.immutables.value.Value; @Value.Immutable @Gson.TypeAdapters @Value.Style(redactedMask = "####") -public interface FilePublishInformation { +public interface FilePublishInformation { @SerializedName("productName") String getProductName(); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java index 6cc6da6e..647330fe 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java @@ -21,6 +21,7 @@ package org.onap.dcaegen2.collectors.datafile.model; import java.util.List; + import org.immutables.gson.Gson; import org.immutables.value.Value; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java index efd69a09..6f6ef004 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/JsonSerializer.java @@ -23,6 +23,7 @@ import com.google.gson.ExclusionStrategy; import com.google.gson.FieldAttributes; import com.google.gson.Gson; import com.google.gson.GsonBuilder; + import java.util.Set; /** @@ -31,11 +32,12 @@ import java.util.Set; public abstract class JsonSerializer { private static Gson gson = new GsonBuilder() // - .serializeNulls() // - .addSerializationExclusionStrategy(new FilePublishInformationExclusionStrategy()) // - .create(); // + .serializeNulls() // + .addSerializationExclusionStrategy(new FilePublishInformationExclusionStrategy()) // + .create(); // - private JsonSerializer() {} + private JsonSerializer() { + } /** * Serializes a <code>filePublishInformation</code>. @@ -53,8 +55,8 @@ public abstract class JsonSerializer { * Elements in FilePublishInformation to include in the file publishing Json string. */ private final Set<String> inclusions = - Sets.newHashSet("productName", "vendorName", "lastEpochMicrosec", "sourceName", "startEpochMicrosec", - "timeZoneOffset", "location", "compression", "fileFormatType", "fileFormatVersion"); + Sets.newHashSet("productName", "vendorName", "lastEpochMicrosec", "sourceName", "startEpochMicrosec", + "timeZoneOffset", "location", "compression", "fileFormatType", "fileFormatVersion"); @Override public boolean shouldSkipField(FieldAttributes fieldAttributes) { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java index 86ea5c3e..72d18437 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java @@ -40,7 +40,8 @@ public final class MappedDiagnosticContext { private static final Logger logger = LoggerFactory.getLogger(MappedDiagnosticContext.class); - private MappedDiagnosticContext() {} + private MappedDiagnosticContext() { + } /** * Inserts the relevant trace information in the HTTP header. @@ -78,6 +79,7 @@ public final class MappedDiagnosticContext { /** * Initialize the MDC when a new context is started. + * * @return a copy of the new trace context */ public static Map<String, String> initializeTraceContext() { @@ -88,6 +90,7 @@ public final class MappedDiagnosticContext { /** * Updates the request ID in the current context. + * * @param newRequestId the new value of the request ID * @return a copy the updated context */ diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java index 5f5ccddf..2ad96e8d 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java @@ -28,7 +28,6 @@ import org.springframework.http.HttpHeaders; import org.springframework.web.reactive.function.client.ExchangeFilterFunction; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient.Builder; - import reactor.core.publisher.Mono; /** @@ -86,7 +85,7 @@ public class DmaapWebClient { MDC.put(SERVICE_NAME, String.valueOf(clientRequest.url())); logger.trace("Request: {} {}", clientRequest.method(), clientRequest.url()); clientRequest.headers() - .forEach((name, values) -> values.forEach(value -> logger.trace("{}={}", name, value))); + .forEach((name, values) -> values.forEach(value -> logger.trace("{}={}", name, value))); logger.trace("HTTP request headers: {}", clientRequest.headers()); MDC.remove(SERVICE_NAME); return Mono.just(clientRequest); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java index 5e02ecdd..abed645a 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java @@ -22,11 +22,13 @@ import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; + import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.stream.StreamSupport; + import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; @@ -37,6 +39,7 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.StringUtils; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -109,8 +112,7 @@ public class JsonMessageParser { private Flux<FileReadyMessage> getMessagesFromJsonArray(JsonElement jsonElement) { return createMessages(Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) - .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) - .orElseGet(JsonObject::new)))); + .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray).orElseGet(JsonObject::new)))); } /** @@ -121,7 +123,7 @@ public class JsonMessageParser { */ private Flux<FileReadyMessage> createMessageData(JsonElement jsonElement) { return jsonElement.isJsonObject() ? createMessages(Flux.just(jsonElement.getAsJsonObject())) - : getMessagesFromJsonArray(jsonElement); + : getMessagesFromJsonArray(jsonElement); } private static Mono<JsonElement> getJsonParserMessage(String message) { @@ -130,10 +132,9 @@ public class JsonMessageParser { private static Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) { return jsonObject.flatMap(monoJsonP -> containsNotificationFields(monoJsonP) ? transformMessages(monoJsonP) - : logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject)); + : logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject)); } - private static Mono<FileReadyMessage> transformMessages(JsonObject message) { Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message); if (optionalMessageMetaData.isPresent()) { @@ -144,8 +145,8 @@ public class JsonMessageParser { List<FileData> allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap, messageMetaData); if (!allFileDataFromJson.isEmpty()) { return Mono.just(ImmutableFileReadyMessage.builder() // - .files(allFileDataFromJson) // - .build()); + .files(allFileDataFromJson) // + .build()); } else { return Mono.empty(); } @@ -158,7 +159,6 @@ public class JsonMessageParser { return Mono.empty(); } - private static Optional<MessageMetaData> getMessageMetaData(JsonObject message) { List<String> missingValues = new ArrayList<>(); JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER); @@ -173,15 +173,15 @@ public class JsonMessageParser { getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION, missingValues); MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() // - .productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues)) // - .vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues)) // - .lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues)) // - .sourceName(getValueFromJson(commonEventHeader, SOURCE_NAME, missingValues)) // - .startEpochMicrosec(getValueFromJson(commonEventHeader, START_EPOCH_MICROSEC, missingValues)) // - .timeZoneOffset(getValueFromJson(commonEventHeader, TIME_ZONE_OFFSET, missingValues)) // - .changeIdentifier(changeIdentifier) // - .changeType(changeType) // - .build(); + .productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues)) // + .vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues)) // + .lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues)) // + .sourceName(getValueFromJson(commonEventHeader, SOURCE_NAME, missingValues)) // + .startEpochMicrosec(getValueFromJson(commonEventHeader, START_EPOCH_MICROSEC, missingValues)) // + .timeZoneOffset(getValueFromJson(commonEventHeader, TIME_ZONE_OFFSET, missingValues)) // + .changeIdentifier(changeIdentifier) // + .changeType(changeType) // + .build(); if (missingValues.isEmpty() && isChangeTypeCorrect(changeType)) { return Optional.of(messageMetaData); } else { @@ -203,7 +203,7 @@ public class JsonMessageParser { } private static List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields, - MessageMetaData messageMetaData) { + MessageMetaData messageMetaData) { List<FileData> res = new ArrayList<>(); for (int i = 0; i < arrayOfAdditionalFields.size(); i++) { JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i); @@ -225,7 +225,7 @@ public class JsonMessageParser { String location = getValueFromJson(data, LOCATION, missingValues); if (StringUtils.isEmpty(location)) { logger.error(ERROR_MSG_VES_EVENT_PARSING + "File information wrong. Missing location. Data: {} {}", - messageMetaData, fileInfo); + messageMetaData, fileInfo); return Optional.empty(); } Scheme scheme; @@ -233,23 +233,23 @@ public class JsonMessageParser { scheme = Scheme.getSchemeFromString(URI.create(location).getScheme()); } catch (Exception e) { logger.error(ERROR_MSG_VES_EVENT_PARSING + "{}. Location: {} Data: {}", e.getMessage(), location, - messageMetaData, e); + messageMetaData, e); return Optional.empty(); } FileData fileData = ImmutableFileData.builder() // - .name(getValueFromJson(fileInfo, NAME, missingValues)) // - .fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues)) // - .fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues)) // - .location(location) // - .scheme(scheme) // - .compression(getValueFromJson(data, COMPRESSION, missingValues)) // - .messageMetaData(messageMetaData) // - .build(); + .name(getValueFromJson(fileInfo, NAME, missingValues)) // + .fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues)) // + .fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues)) // + .location(location) // + .scheme(scheme) // + .compression(getValueFromJson(data, COMPRESSION, missingValues)) // + .messageMetaData(messageMetaData) // + .build(); if (missingValues.isEmpty()) { return Optional.of(fileData); } logger.error(ERROR_MSG_VES_EVENT_PARSING + "File information wrong. Missing data: {} Data: {}", missingValues, - fileInfo); + fileInfo); return Optional.empty(); } @@ -263,16 +263,15 @@ public class JsonMessageParser { * @return String of data from event name */ private static String getDataFromEventName(EventNameDataType dataType, String eventName, - List<String> missingValues) { + List<String> missingValues) { String[] eventArray = eventName.split("_|-"); if (eventArray.length >= 4) { return eventArray[dataType.index]; } else { missingValues.add(dataType.toString()); logger.error( - ERROR_MSG_VES_EVENT_PARSING - + "Can not get {} from eventName, eventName is not in correct format: {}", - dataType, eventName); + ERROR_MSG_VES_EVENT_PARSING + "Can not get {} from eventName, eventName is not in correct format: {}", + dataType, eventName); } return ""; } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java index 475e0224..91b74042 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java @@ -18,7 +18,6 @@ package org.onap.dcaegen2.collectors.datafile.service; import java.nio.file.Path; import java.time.Instant; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.Map; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java index 198c1bf1..a46e17ba 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java @@ -83,7 +83,7 @@ public class DmaapProducerHttpClient { * @throws DatafileTaskException if anything goes wrong. */ public HttpResponse getDmaapProducerResponseWithRedirect(HttpUriRequest request, Map<String, String> contextMap) - throws DatafileTaskException { + throws DatafileTaskException { MDC.setContextMap(contextMap); try (CloseableHttpAsyncClient webClient = createWebClient(true, DEFAULT_REQUEST_TIMEOUT, contextMap)) { webClient.start(); @@ -110,7 +110,7 @@ public class DmaapProducerHttpClient { * @throws DatafileTaskException if anything goes wrong. */ public HttpResponse getDmaapProducerResponseWithCustomTimeout(HttpUriRequest request, Duration requestTimeout, - Map<String, String> contextMap) throws DatafileTaskException { + Map<String, String> contextMap) throws DatafileTaskException { MDC.setContextMap(contextMap); try (CloseableHttpAsyncClient webClient = createWebClient(false, requestTimeout, contextMap)) { webClient.start(); @@ -140,13 +140,13 @@ public class DmaapProducerHttpClient { } private CloseableHttpAsyncClient createWebClient(boolean expectRedirect, Duration requestTimeout, - Map<String, String> contextMap) throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException { + Map<String, String> contextMap) throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException { SSLContext sslContext = - new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build(); + new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build(); HttpAsyncClientBuilderWrapper clientBuilder = getHttpClientBuilder(); clientBuilder.setSslContext(sslContext) // - .setSslHostnameVerifier(new NoopHostnameVerifier()); + .setSslHostnameVerifier(new NoopHostnameVerifier()); if (expectRedirect) { clientBuilder.setRedirectStrategy(new PublishRedirectStrategy(contextMap)); @@ -155,10 +155,10 @@ public class DmaapProducerHttpClient { if (requestTimeout.toMillis() > 0) { int millis = (int) requestTimeout.toMillis(); RequestConfig requestConfig = RequestConfig.custom() // - .setSocketTimeout(millis) // - .setConnectTimeout(millis) // - .setConnectionRequestTimeout(millis) // - .build(); + .setSocketTimeout(millis) // + .setConnectTimeout(millis) // + .setConnectionRequestTimeout(millis) // + .build(); clientBuilder.setDefaultRequestConfig(requestConfig); } else { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java index f1d33454..081c7f39 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java @@ -51,7 +51,7 @@ public class DMaaPMessageConsumer { } protected DMaaPMessageConsumer(DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient, - JsonMessageParser messageParser) { + JsonMessageParser messageParser) { this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient; this.jsonMessageParser = messageParser; } @@ -72,7 +72,7 @@ public class DMaaPMessageConsumer { } private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig) - throws DatafileTaskException { + throws DatafileTaskException { DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration().toDmaap(); WebClient client = new DmaapWebClient().fromConfiguration(config).build(); return new DMaaPConsumerReactiveHttpClient(config, client); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java index 02e153cf..bfd3f3e3 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -22,12 +22,14 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import com.google.gson.JsonElement; import com.google.gson.JsonParser; + import java.io.File; import java.io.IOException; import java.net.MalformedURLException; import java.net.URI; import java.nio.file.Path; import java.time.Duration; + import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpPut; import org.apache.http.entity.ContentType; @@ -79,13 +81,13 @@ public class DataRouterPublisher { * @return the (same) filePublishInformation */ public Mono<FilePublishInformation> publishFile(FilePublishInformation publishInfo, long numRetries, - Duration firstBackoff) { + Duration firstBackoff) { MDC.setContextMap(publishInfo.getContext()); return Mono.just(publishInfo) // - .cache() // - .flatMap(this::publishFile) // - .flatMap(httpStatus -> handleHttpResponse(httpStatus, publishInfo)) // - .retryBackoff(numRetries, firstBackoff); + .cache() // + .flatMap(this::publishFile) // + .flatMap(httpStatus -> handleHttpResponse(httpStatus, publishInfo)) // + .retryBackoff(numRetries, firstBackoff); } private Mono<HttpStatus> publishFile(FilePublishInformation publishInfo) { @@ -99,7 +101,7 @@ public class DataRouterPublisher { dmaapProducerHttpClient.addUserCredentialsToHead(put); HttpResponse response = - dmaapProducerHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext()); + dmaapProducerHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext()); logger.trace("{}", response); counters.incTotalPublishedFiles(); return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode())); @@ -116,10 +118,10 @@ public class DataRouterPublisher { JsonElement metaData = new JsonParser().parse(JsonSerializer.createJsonBodyForDataRouter(publishInfo)); put.addHeader(X_DMAAP_DR_META, metaData.toString()); URI uri = new DefaultUriBuilderFactory( - datafileAppConfig.getPublisherConfiguration(publishInfo.getChangeIdentifier()).publishUrl()) // - .builder() // - .pathSegment(publishInfo.getName()) // - .build(); + datafileAppConfig.getPublisherConfiguration(publishInfo.getChangeIdentifier()).publishUrl()) // + .builder() // + .pathSegment(publishInfo.getName()) // + .build(); put.setURI(uri); MappedDiagnosticContext.appendTraceInfo(put); @@ -132,7 +134,7 @@ public class DataRouterPublisher { } private static Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, - FilePublishInformation publishInfo) { + FilePublishInformation publishInfo) { MDC.setContextMap(publishInfo.getContext()); if (HttpUtils.isSuccessfulResponseCode(response.value())) { logger.trace("Publishing file {} to DR successful!", publishInfo.getName()); @@ -140,7 +142,7 @@ public class DataRouterPublisher { } else { logger.warn("Publishing file {} to DR unsuccessful. Response code: {}", publishInfo.getName(), response); return Mono.error(new Exception( - "Publishing file " + publishInfo.getName() + " to DR unsuccessful. Response code: " + response)); + "Publishing file " + publishInfo.getName() + " to DR unsuccessful. Response code: " + response)); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java index 311f752d..20bf599b 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -21,6 +21,7 @@ import java.nio.file.Paths; import java.time.Duration; import java.util.Map; import java.util.Optional; + import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; @@ -70,15 +71,15 @@ public class FileCollector { * @return the data needed to publish the file. */ public Mono<FilePublishInformation> collectFile(FileData fileData, long numRetries, Duration firstBackoff, - Map<String, String> contextMap) { + Map<String, String> contextMap) { MDC.setContextMap(contextMap); logger.trace("Entering collectFile with {}", fileData); return Mono.just(fileData) // - .cache() // - .flatMap(fd -> tryCollectFile(fileData, contextMap)) // - .retryBackoff(numRetries, firstBackoff) // - .flatMap(FileCollector::checkCollectedFile); + .cache() // + .flatMap(fd -> tryCollectFile(fileData, contextMap)) // + .retryBackoff(numRetries, firstBackoff) // + .flatMap(FileCollector::checkCollectedFile); } private static Mono<FilePublishInformation> checkCollectedFile(Optional<FilePublishInformation> info) { @@ -105,7 +106,7 @@ public class FileCollector { return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context))); } catch (DatafileTaskException e) { logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(), - e.toString()); + e.toString()); counters.incNoOfFailedFtpAttempts(); if (e instanceof NonRetryableDatafileTaskException) { return Mono.just(Optional.empty()); // Give up @@ -114,7 +115,7 @@ public class FileCollector { } } catch (Exception throwable) { logger.warn("Failed to close ftp client: {} {}, reason: {}", fileData.sourceName(), fileData.name(), - throwable.toString(), throwable); + throwable.toString(), throwable); return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context))); } } @@ -131,25 +132,25 @@ public class FileCollector { } private static FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile, - Map<String, String> context) { + Map<String, String> context) { String location = fileData.location(); MessageMetaData metaData = fileData.messageMetaData(); return ImmutableFilePublishInformation.builder() // - .productName(metaData.productName()) // - .vendorName(metaData.vendorName()) // - .lastEpochMicrosec(metaData.lastEpochMicrosec()) // - .sourceName(metaData.sourceName()) // - .startEpochMicrosec(metaData.startEpochMicrosec()) // - .timeZoneOffset(metaData.timeZoneOffset()) // - .name(fileData.name()) // - .location(location) // - .internalLocation(localFile) // - .compression(fileData.compression()) // - .fileFormatType(fileData.fileFormatType()) // - .fileFormatVersion(fileData.fileFormatVersion()) // - .changeIdentifier(fileData.messageMetaData().changeIdentifier()) // - .context(context) // - .build(); + .productName(metaData.productName()) // + .vendorName(metaData.vendorName()) // + .lastEpochMicrosec(metaData.lastEpochMicrosec()) // + .sourceName(metaData.sourceName()) // + .startEpochMicrosec(metaData.startEpochMicrosec()) // + .timeZoneOffset(metaData.timeZoneOffset()) // + .name(fileData.name()) // + .location(location) // + .internalLocation(localFile) // + .compression(fileData.compression()) // + .fileFormatType(fileData.fileFormatType()) // + .fileFormatVersion(fileData.fileFormatVersion()) // + .changeIdentifier(fileData.messageMetaData().changeIdentifier()) // + .context(context) // + .build(); } protected SftpClient createSftpClient(FileData fileData) { @@ -159,6 +160,6 @@ public class FileCollector { protected FtpsClient createFtpsClient(FileData fileData) { FtpesConfig config = datafileAppConfig.getFtpesConfiguration(); return new FtpsClient(fileData.fileServerData(), config.keyCert(), config.keyPassword(), - Paths.get(config.trustedCa()), config.trustedCaPassword()); + Paths.get(config.trustedCa()), config.trustedCaPassword()); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java index 4d8d679d..037803bd 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java @@ -73,7 +73,7 @@ public class PublishedChecker { * @throws DatafileTaskException if the check fails */ public boolean isFilePublished(String fileName, String changeIdentifier, Map<String, String> contextMap) - throws DatafileTaskException { + throws DatafileTaskException { MDC.setContextMap(contextMap); PublisherConfiguration publisherConfig = resolveConfiguration(changeIdentifier); @@ -86,8 +86,8 @@ public class PublishedChecker { getRequest.setURI(getPublishedQueryUri(fileName, publisherConfig)); producerClient.addUserCredentialsToHead(getRequest); - HttpResponse response = producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest, - WEB_CLIENT_TIMEOUT, contextMap); + HttpResponse response = + producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest, WEB_CLIENT_TIMEOUT, contextMap); logger.trace("{}", response); int status = response.getStatusLine().getStatusCode(); @@ -104,9 +104,9 @@ public class PublishedChecker { private static URI getPublishedQueryUri(String fileName, PublisherConfiguration config) throws URISyntaxException { return new URIBuilder(config.logUrl()) // - .addParameter("type", "pub") // - .addParameter("filename", fileName) // - .build(); + .addParameter("type", "pub") // + .addParameter("filename", fileName) // + .build(); } protected PublisherConfiguration resolveConfiguration(String changeIdentifier) throws DatafileTaskException { @@ -114,7 +114,7 @@ public class PublishedChecker { } protected DmaapProducerHttpClient resolveClient(PublisherConfiguration publisherConfig) - throws DatafileTaskException { + throws DatafileTaskException { try { return new DmaapProducerHttpClient(publisherConfig.toDmaap()); } catch (MalformedURLException e) { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 300ca601..bc73ddb2 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -22,6 +22,7 @@ import java.time.Duration; import java.time.Instant; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; + import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.Counters; @@ -35,12 +36,12 @@ import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; - /** * This implements the main flow of the data file collector. Fetch file ready events from the * message router, fetch new files from the PNF publish these in the data router. @@ -84,10 +85,10 @@ public class ScheduledTasks { try { if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING || this.threadPoolQueueSize.get() > 0) { logger.info( - "Skipping consuming new files; current number of tasks: {}, number of subscriptions: {}, " - + "published files: {}, number of queued VES events: {}", - getCurrentNumberOfTasks(), this.currentNumberOfSubscriptions.get(), publishedFilesCache.size(), - threadPoolQueueSize.get()); + "Skipping consuming new files; current number of tasks: {}, number of subscriptions: {}, " + + "published files: {}, number of queued VES events: {}", + getCurrentNumberOfTasks(), this.currentNumberOfSubscriptions.get(), publishedFilesCache.size(), + threadPoolQueueSize.get()); return; } if (this.applicationConfiguration.getDmaapConsumerConfiguration() == null) { @@ -99,15 +100,15 @@ public class ScheduledTasks { Map<String, String> context = MappedDiagnosticContext.initializeTraceContext(); logger.trace("Execution of tasks was registered"); createMainTask(context) // - .subscribe(ScheduledTasks::onSuccess, // - throwable -> { - onError(throwable, context); - currentNumberOfSubscriptions.decrementAndGet(); - }, // - () -> { - onComplete(context); - currentNumberOfSubscriptions.decrementAndGet(); - }); + .subscribe(ScheduledTasks::onSuccess, // + throwable -> { + onError(throwable, context); + currentNumberOfSubscriptions.decrementAndGet(); + }, // + () -> { + onComplete(context); + currentNumberOfSubscriptions.decrementAndGet(); + }); } catch (Exception e) { logger.error("Unexpected exception: {}", e.toString(), e); } @@ -115,21 +116,21 @@ public class ScheduledTasks { Flux<FilePublishInformation> createMainTask(Map<String, String> context) { return fetchMoreFileReadyMessages() // - .doOnNext(fileReadyMessage -> threadPoolQueueSize.incrementAndGet()) // - .doOnNext(fileReadyMessage -> counters.incNoOfReceivedEvents()) // - .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread - .runOn(scheduler) // - .doOnNext(fileReadyMessage -> threadPoolQueueSize.decrementAndGet()) // - .flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) // - .flatMap(fileData -> createMdcContext(fileData, context)) // - .filter(this::isFeedConfigured) // - .filter(this::shouldBePublished) // - .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) // - .flatMap(this::fetchFile, false, 1, 1) // - .flatMap(this::publishToDataRouter, false, 1, 1) // - .doOnNext(publishInfo -> deleteFile(publishInfo.getInternalLocation(), publishInfo.getContext())) // - .doOnNext(publishInfo -> currentNumberOfTasks.decrementAndGet()) // - .sequential(); + .doOnNext(fileReadyMessage -> threadPoolQueueSize.incrementAndGet()) // + .doOnNext(fileReadyMessage -> counters.incNoOfReceivedEvents()) // + .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread + .runOn(scheduler) // + .doOnNext(fileReadyMessage -> threadPoolQueueSize.decrementAndGet()) // + .flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) // + .flatMap(fileData -> createMdcContext(fileData, context)) // + .filter(this::isFeedConfigured) // + .filter(this::shouldBePublished) // + .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) // + .flatMap(this::fetchFile, false, 1, 1) // + .flatMap(this::publishToDataRouter, false, 1, 1) // + .doOnNext(publishInfo -> deleteFile(publishInfo.getInternalLocation(), publishInfo.getContext())) // + .doOnNext(publishInfo -> currentNumberOfTasks.decrementAndGet()) // + .sequential(); } private class FileDataWithContext { @@ -157,7 +158,6 @@ public class ScheduledTasks { return this.counters; } - protected DMaaPMessageConsumer createConsumerTask() throws DatafileTaskException { return new DMaaPMessageConsumer(this.applicationConfiguration); } @@ -213,7 +213,7 @@ public class ScheduledTasks { return true; } else { logger.info("No feed is configured for: {}, file ignored: {}", - fileData.fileData.messageMetaData().changeIdentifier(), fileData.fileData.name()); + fileData.fileData.messageMetaData().changeIdentifier(), fileData.fileData.name()); return false; } } @@ -223,7 +223,7 @@ public class ScheduledTasks { if (publishedFilesCache.put(localFilePath) == null) { try { boolean result = !createPublishedChecker().isFilePublished(fileData.fileData.name(), - fileData.fileData.messageMetaData().changeIdentifier(), fileData.context); + fileData.fileData.messageMetaData().changeIdentifier(), fileData.context); return result; } catch (DatafileTaskException e) { logger.error("Cannot check if a file {} is published", fileData.fileData.name(), e); @@ -237,9 +237,9 @@ public class ScheduledTasks { private Mono<FilePublishInformation> fetchFile(FileDataWithContext fileData) { MDC.setContextMap(fileData.context); return createFileCollector() // - .collectFile(fileData.fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, - fileData.context) // - .onErrorResume(exception -> handleFetchFileFailure(fileData)); + .collectFile(fileData.fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, + fileData.context) // + .onErrorResume(exception -> handleFetchFileFailure(fileData)); } private Mono<FilePublishInformation> handleFetchFileFailure(FileDataWithContext fileData) { @@ -257,8 +257,8 @@ public class ScheduledTasks { MDC.setContextMap(publishInfo.getContext()); return createDataRouterPublisher() - .publishFile(publishInfo, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT) - .onErrorResume(exception -> handlePublishFailure(publishInfo)); + .publishFile(publishInfo, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT) + .onErrorResume(exception -> handlePublishFailure(publishInfo)); } private Mono<FilePublishInformation> handlePublishFailure(FilePublishInformation publishInfo) { @@ -277,15 +277,15 @@ public class ScheduledTasks { */ Flux<FileReadyMessage> fetchMoreFileReadyMessages() { logger.info( - "Consuming new file ready messages, current number of tasks: {}, published files: {}, " - + "number of subscriptions: {}", - getCurrentNumberOfTasks(), publishedFilesCache.size(), this.currentNumberOfSubscriptions.get()); + "Consuming new file ready messages, current number of tasks: {}, published files: {}, " + + "number of subscriptions: {}", + getCurrentNumberOfTasks(), publishedFilesCache.size(), this.currentNumberOfSubscriptions.get()); Map<String, String> context = MDC.getCopyOfContextMap(); try { return createConsumerTask() // - .getMessageRouterResponse() // - .onErrorResume(exception -> handleConsumeMessageFailure(exception, context)); + .getMessageRouterResponse() // + .onErrorResume(exception -> handleConsumeMessageFailure(exception, context)); } catch (Exception e) { logger.error("Could not create message consumer task", e); return Flux.empty(); @@ -295,7 +295,7 @@ public class ScheduledTasks { private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> context) { MDC.setContextMap(context); logger.error("Polling for file ready message failed, exception: {}, config: {}", exception.toString(), - this.applicationConfiguration.getDmaapConsumerConfiguration()); + this.applicationConfiguration.getDmaapConsumerConfiguration()); return Flux.empty(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/web/PublishRedirectStrategy.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/web/PublishRedirectStrategy.java index 9dcf133a..cc473a78 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/web/PublishRedirectStrategy.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/web/PublishRedirectStrategy.java @@ -18,6 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.web; import java.net.URI; import java.util.Map; + import org.apache.http.HttpRequest; import org.apache.http.HttpResponse; import org.apache.http.ProtocolException; @@ -51,11 +52,11 @@ public class PublishRedirectStrategy extends DefaultRedirectStrategy { * Redirectable methods. */ private static final String[] REDIRECT_METHODS = new String[] { // - HttpPut.METHOD_NAME, // - HttpGet.METHOD_NAME, // - HttpPost.METHOD_NAME, // - HttpHead.METHOD_NAME, // - HttpDelete.METHOD_NAME // + HttpPut.METHOD_NAME, // + HttpGet.METHOD_NAME, // + HttpPost.METHOD_NAME, // + HttpHead.METHOD_NAME, // + HttpDelete.METHOD_NAME // }; /** @@ -79,7 +80,7 @@ public class PublishRedirectStrategy extends DefaultRedirectStrategy { @Override public HttpUriRequest getRedirect(final HttpRequest request, final HttpResponse response, final HttpContext context) - throws ProtocolException { + throws ProtocolException { MDC.setContextMap(contextMap); final URI uri = getLocationURI(request, response, context); logger.trace("getRedirect...: {}", request); |