diff options
Diffstat (limited to 'datafile-app-server/src/main/java')
11 files changed, 151 insertions, 79 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java index 8e15deb7..e9d84640 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java @@ -22,7 +22,6 @@ import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonSyntaxException; import com.google.gson.TypeAdapterFactory; - import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; @@ -32,10 +31,8 @@ import java.time.Duration; import java.util.Map; import java.util.Properties; import java.util.ServiceLoader; - import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; - import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties; @@ -49,7 +46,6 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.ComponentScan; import org.springframework.stereotype.Component; - import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -69,12 +65,12 @@ public class AppConfig { private static final Logger logger = LoggerFactory.getLogger(AppConfig.class); private ConsumerConfiguration dmaapConsumerConfiguration; - private Map<String, PublisherConfiguration> publishingConfiguration; + Map<String, PublisherConfiguration> publishingConfigurations; private FtpesConfig ftpesConfiguration; private CloudConfigurationProvider cloudConfigurationProvider; @Value("#{systemEnvironment}") Properties systemEnvironment; - private Disposable refreshConfigTask = null; + Disposable refreshConfigTask = null; @NotEmpty private String filepath; @@ -104,6 +100,9 @@ public class AppConfig { () -> logger.error("Configuration refresh terminated")); } + /** + * Stops the refreshing of the configuration. + */ public void stop() { if (refreshConfigTask != null) { refreshConfigTask.dispose(); @@ -115,17 +114,33 @@ public class AppConfig { return dmaapConsumerConfiguration; } + /** + * Checks if there is a configuration for the given feed. + * + * @param changeIdentifier the change identifier the feed is configured to belong to. + * + * @return true if a feed is configured for the given change identifier, false if not. + */ public synchronized boolean isFeedConfigured(String changeIdentifier) { - return publishingConfiguration.containsKey(changeIdentifier); + return publishingConfigurations.containsKey(changeIdentifier); } + /** + * Gets the feed configuration for the given change identifier. + * + * @param changeIdentifier the change identifier the feed is configured to belong to. + * @return the <code>PublisherConfiguration</code> for the feed belonging to the given change identifier. + * + * @throws DatafileTaskException if no configuration has been loaded or the configuration is missing for the given + * change identifier. + */ public synchronized PublisherConfiguration getPublisherConfiguration(String changeIdentifier) throws DatafileTaskException { - if (publishingConfiguration == null) { + if (publishingConfigurations == null) { throw new DatafileTaskException("No PublishingConfiguration loaded, changeIdentifier: " + changeIdentifier); } - PublisherConfiguration cfg = publishingConfiguration.get(changeIdentifier); + PublisherConfiguration cfg = publishingConfigurations.get(changeIdentifier); if (cfg == null) { throw new DatafileTaskException( "Cannot find getPublishingConfiguration for changeIdentifier: " + changeIdentifier); @@ -184,7 +199,7 @@ public class AppConfig { private AppConfig parseCloudConfig(JsonObject serviceConfigRootObject, JsonObject dmaapConfigRootObject) { try { CloudConfigParser parser = new CloudConfigParser(serviceConfigRootObject, dmaapConfigRootObject); - setConfiguration(parser.getDmaapConsumerConfig(), parser.getDmaapPublisherConfig(), + setConfiguration(parser.getDmaapConsumerConfig(), parser.getDmaapPublisherConfigurations(), parser.getFtpesConfig()); } catch (DatafileTaskException e) { logger.error("Could not parse configuration {}", e.toString(), e); @@ -212,14 +227,13 @@ public class AppConfig { } private synchronized void setConfiguration(ConsumerConfiguration consumerConfiguration, - Map<String, PublisherConfiguration> publisherConfiguration, FtpesConfig ftpesConfig) { - if (consumerConfiguration == null || publisherConfiguration == null || ftpesConfig == null) { - logger.error( - "Problem with configuration consumerConfiguration: {}, publisherConfiguration: {}, ftpesConfig: {}", - consumerConfiguration, publisherConfiguration, ftpesConfig); + Map<String, PublisherConfiguration> publisherConfigurations, FtpesConfig ftpesConfig) { + if (consumerConfiguration == null || publisherConfigurations == null || ftpesConfig == null) { + logger.error("Problem with consumerConfiguration: {}, publisherConfigurations: {}, ftpesConfig: {}", + consumerConfiguration, publisherConfigurations, ftpesConfig); } else { this.dmaapConsumerConfiguration = consumerConfiguration; - this.publishingConfiguration = publisherConfiguration; + this.publishingConfigurations = publisherConfigurations; this.ftpesConfiguration = ftpesConfig; } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java index d25d7db1..0242bef7 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,13 +21,11 @@ package org.onap.dcaegen2.collectors.datafile.configuration; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; - import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.Set; - import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; /** @@ -51,7 +49,14 @@ public class CloudConfigParser { this.dmaapConfigurationRoot = dmaapConfigurationRoot; } - public Map<String, PublisherConfiguration> getDmaapPublisherConfig() throws DatafileTaskException { + /** + * Get the publisher configurations. + * + * @return a map with change identifier as key and the connected publisher configuration as value. + * + * @throws DatafileTaskException if a member of the configuration is missing. + */ + public Map<String, PublisherConfiguration> getDmaapPublisherConfigurations() throws DatafileTaskException { Iterator<JsonElement> producerCfgs = toArray(serviceConfigurationRoot.get("dmaap.dmaapProducerConfiguration")).iterator(); @@ -81,6 +86,12 @@ public class CloudConfigParser { return result; } + /** + * Get the consumer configuration. + * + * @return the consumer configuration. + * @throws DatafileTaskException if a member of the configuration is missing. + */ public ConsumerConfiguration getDmaapConsumerConfig() throws DatafileTaskException { JsonObject consumerCfg = serviceConfigurationRoot.get("streams_subscribes").getAsJsonObject(); Set<Entry<String, JsonElement>> topics = consumerCfg.entrySet(); @@ -100,6 +111,12 @@ public class CloudConfigParser { .build(); } + /** + * Get the security configuration for communication with the xNF. + * + * @return the xNF communication security configuration. + * @throws DatafileTaskException if a member of the configuration is missing. + */ public FtpesConfig getFtpesConfig() throws DatafileTaskException { return new ImmutableFtpesConfig.Builder() // .keyCert(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyCert")) diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java index 1fd24e98..e62a11e0 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java @@ -18,7 +18,6 @@ package org.onap.dcaegen2.collectors.datafile.configuration; import java.net.MalformedURLException; import java.net.URL; - import org.immutables.gson.Gson; import org.immutables.value.Value; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; @@ -42,6 +41,13 @@ public abstract class ConsumerConfiguration { public abstract Boolean enableDmaapCertAuth(); + /** + * Gets the configuration in the SDK version. + * + * @return a <code>DmaapConsumerConfiguration</code> representing the configuration. + * + * @throws DatafileTaskException if something is wrong with the topic URL. + */ public DmaapConsumerConfiguration toDmaap() throws DatafileTaskException { try { URL url = new URL(topicUrl()); @@ -91,16 +97,14 @@ public abstract class ConsumerConfiguration { } private DmaapConsumerUrlPath parseDmaapUrlPath(String urlPath) throws DatafileTaskException { - String[] tokens = urlPath.split("/"); // UrlPath: - // /events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12 + String[] tokens = urlPath.split("/"); // /events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12 if (tokens.length != 5) { throw new DatafileTaskException("The path has incorrect syntax: " + urlPath); } - final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // e.g. - // /events/unauthenticated.VES_NOTIFICATION_OUTPUT - final String consumerGroup = tokens[3]; // ex. OpenDcae-c12 - final String consumerId = tokens[4]; // ex. C12 + final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // /events/unauthenticated.VES_NOTIFICATION_OUTPUT + final String consumerGroup = tokens[3]; // OpenDcae-c12 + final String consumerId = tokens[4]; // C12 return new DmaapConsumerUrlPath(dmaapTopicName, consumerGroup, consumerId); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java index 3c3a7625..7a845246 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java @@ -18,7 +18,6 @@ package org.onap.dcaegen2.collectors.datafile.configuration; import java.net.MalformedURLException; import java.net.URL; - import org.immutables.gson.Gson; import org.immutables.value.Value; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; @@ -50,6 +49,12 @@ public interface PublisherConfiguration { String changeIdentifier(); + /** + * Get the publisher configuration in SDK format. + * + * @return a <code>DmaapPublisherConfiguration</code> contining the publisher configuration. + * @throws MalformedURLException if the publish URL is malformed. + */ default DmaapPublisherConfiguration toDmaap() throws MalformedURLException { URL url = new URL(publishUrl()); String urlPath = url.getPath(); @@ -69,5 +74,4 @@ public interface PublisherConfiguration { .enableDmaapCertAuth(this.enableDmaapCertAuth()) // .build(); } - } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java index e11cd76b..bdedba4e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java @@ -22,10 +22,8 @@ import com.jcraft.jsch.JSch; import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; import com.jcraft.jsch.SftpException; - import java.nio.file.Path; import java.util.Optional; - import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException; import org.slf4j.Logger; @@ -97,7 +95,7 @@ public class SftpClient implements FileCollectClient { throw new DatafileTaskException("Could not open Sftp client. " + e); } else { throw new NonRetryableDatafileTaskException( - "Could not open Sftp client, no retry attempts will be done " + e); + "Could not open Sftp client, no retry attempts will be done. " + e); } } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java index 4c42284e..878bb554 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java @@ -24,7 +24,6 @@ import java.time.Instant; import java.util.concurrent.atomic.AtomicInteger; /** - * * Various counters that can be shown via a REST API. * */ @@ -80,6 +79,7 @@ public class Counters { noOfFailedPublish++; } + @Override public synchronized String toString() { StringBuilder str = new StringBuilder(); str.append(format("totalReceivedEvents", totalReceivedEvents)); @@ -104,4 +104,49 @@ public class Counters { String header = name + ":"; return String.format("%-24s%-22s\n", header, value); } + + public int getNoOfCollectedFiles() { + return noOfCollectedFiles; + } + + public int getNoOfFailedFtpAttempts() { + return noOfFailedFtpAttempts; + } + + public int getNoOfFailedFtp() { + return noOfFailedFtp; + } + + public int getNoOfFailedPublishAttempts() { + return noOfFailedPublishAttempts; + } + + public int getTotalPublishedFiles() { + return totalPublishedFiles; + } + + public int getNoOfFailedPublish() { + return noOfFailedPublish; + } + + public int getTotalReceivedEvents() { + return totalReceivedEvents; + } + + /** + * Resets all data. + */ + public void clear() { + numberOfTasks.set(0); + numberOfSubscriptions.set(0); + noOfCollectedFiles = 0; + noOfFailedFtpAttempts = 0; + noOfFailedFtp = 0; + noOfFailedPublishAttempts = 0; + totalPublishedFiles = 0; + noOfFailedPublish = 0; + lastPublishedTime = Instant.MIN; + totalReceivedEvents = 0; + lastEventTime = Instant.MIN; + } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java index 2ad96e8d..5a8806b4 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java @@ -18,13 +18,14 @@ package org.onap.dcaegen2.collectors.datafile.service; import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE; import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.SERVICE_NAME; -import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapCustomConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.http.HttpHeaders; +import org.springframework.web.reactive.function.client.ClientRequest; +import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.ExchangeFilterFunction; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient.Builder; @@ -37,11 +38,9 @@ import reactor.core.publisher.Mono; */ public class DmaapWebClient { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private static final Logger logger = LoggerFactory.getLogger(DmaapWebClient.class); private String contentType; - private String dmaapUserName; - private String dmaapUserPassword; /** * Creating DmaapReactiveWebClient passing to them basic DmaapConfig. @@ -62,33 +61,31 @@ public class DmaapWebClient { public WebClient build() { Builder webClientBuilder = WebClient.builder() // .defaultHeader(HttpHeaders.CONTENT_TYPE, contentType) // - .filter(logRequest()) // - .filter(logResponse()); - if (dmaapUserName != null && !dmaapUserName.isEmpty() && dmaapUserPassword != null - && !dmaapUserPassword.isEmpty()) { - webClientBuilder.filter(basicAuthentication(dmaapUserName, dmaapUserPassword)); - } + .filter(getRequestFilter()) // + .filter(getResponseFilter()); return webClientBuilder.build(); } - private ExchangeFilterFunction logResponse() { - return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> { - MDC.put(RESPONSE_CODE, String.valueOf(clientResponse.statusCode())); - logger.trace("Response Status {}", clientResponse.statusCode()); - MDC.remove(RESPONSE_CODE); - return Mono.just(clientResponse); - }); + private ExchangeFilterFunction getResponseFilter() { + return ExchangeFilterFunction.ofResponseProcessor(this::logResponse); } - private ExchangeFilterFunction logRequest() { - return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> { - MDC.put(SERVICE_NAME, String.valueOf(clientRequest.url())); - logger.trace("Request: {} {}", clientRequest.method(), clientRequest.url()); - clientRequest.headers() - .forEach((name, values) -> values.forEach(value -> logger.trace("{}={}", name, value))); - logger.trace("HTTP request headers: {}", clientRequest.headers()); - MDC.remove(SERVICE_NAME); - return Mono.just(clientRequest); - }); + Mono<ClientResponse> logResponse(ClientResponse clientResponse) { + MDC.put(RESPONSE_CODE, String.valueOf(clientResponse.statusCode())); + logger.trace("Response Status {}", clientResponse.statusCode()); + MDC.remove(RESPONSE_CODE); + return Mono.just(clientResponse); + } + + private ExchangeFilterFunction getRequestFilter() { + return ExchangeFilterFunction.ofRequestProcessor(this::logRequest); + } + + Mono<ClientRequest> logRequest(ClientRequest clientRequest) { + MDC.put(SERVICE_NAME, String.valueOf(clientRequest.url())); + logger.trace("Request: {} {}", clientRequest.method(), clientRequest.url()); + logger.trace("HTTP request headers: {}", clientRequest.headers()); + MDC.remove(SERVICE_NAME); + return Mono.just(clientRequest); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java index 91b74042..a14bfd89 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java @@ -27,7 +27,7 @@ import java.util.Map; * the key was last used. */ public class PublishedFileCache { - private final Map<Path, Instant> publishedFiles = new HashMap<Path, Instant>(); + private final Map<Path, Instant> publishedFiles = new HashMap<>(); /** * Adds a file to the cache. diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java index bfd3f3e3..bdec7199 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -22,14 +22,11 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import com.google.gson.JsonElement; import com.google.gson.JsonParser; - import java.io.File; -import java.io.IOException; import java.net.MalformedURLException; import java.net.URI; import java.nio.file.Path; import java.time.Duration; - import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpPut; import org.apache.http.entity.ContentType; @@ -103,7 +100,6 @@ public class DataRouterPublisher { HttpResponse response = dmaapProducerHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext()); logger.trace("{}", response); - counters.incTotalPublishedFiles(); return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode())); } catch (Exception e) { counters.incNoOfFailedPublishAttempts(); @@ -127,26 +123,27 @@ public class DataRouterPublisher { MappedDiagnosticContext.appendTraceInfo(put); } - private void prepareBody(FilePublishInformation publishInfo, HttpPut put) throws IOException { + private void prepareBody(FilePublishInformation publishInfo, HttpPut put) { File file = createInputFile(publishInfo.getInternalLocation()); FileEntity entity = new FileEntity(file, ContentType.DEFAULT_BINARY); put.setEntity(entity); } - private static Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, - FilePublishInformation publishInfo) { + private Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation publishInfo) { MDC.setContextMap(publishInfo.getContext()); if (HttpUtils.isSuccessfulResponseCode(response.value())) { + counters.incTotalPublishedFiles(); logger.trace("Publishing file {} to DR successful!", publishInfo.getName()); return Mono.just(publishInfo); } else { + counters.incNoOfFailedPublishAttempts(); logger.warn("Publishing file {} to DR unsuccessful. Response code: {}", publishInfo.getName(), response); return Mono.error(new Exception( "Publishing file " + publishInfo.getName() + " to DR unsuccessful. Response code: " + response)); } } - File createInputFile(Path filePath) throws IOException { + File createInputFile(Path filePath) { FileSystemResource realResource = new FileSystemResource(filePath); return realResource.getFile(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java index 20bf599b..1ce64e41 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -21,7 +21,6 @@ import java.nio.file.Paths; import java.time.Duration; import java.util.Map; import java.util.Optional; - import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; @@ -104,15 +103,15 @@ public class FileCollector { currentClient.collectFile(remoteFile, localFile); counters.incNoOfCollectedFiles(); return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context))); + } catch (NonRetryableDatafileTaskException nre) { + logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(), nre); + counters.incNoOfFailedFtpAttempts(); + return Mono.just(Optional.empty()); // Give up } catch (DatafileTaskException e) { logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(), e.toString()); counters.incNoOfFailedFtpAttempts(); - if (e instanceof NonRetryableDatafileTaskException) { - return Mono.just(Optional.empty()); // Give up - } else { - return Mono.error(e); - } + return Mono.error(e); } catch (Exception throwable) { logger.warn("Failed to close ftp client: {} {}, reason: {}", fileData.sourceName(), fileData.name(), throwable.toString(), throwable); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index bc73ddb2..26353e38 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -22,7 +22,6 @@ import java.time.Duration; import java.time.Instant; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; - import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.Counters; @@ -36,7 +35,6 @@ import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; - import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; @@ -222,9 +220,8 @@ public class ScheduledTasks { Path localFilePath = fileData.fileData.getLocalFilePath(); if (publishedFilesCache.put(localFilePath) == null) { try { - boolean result = !createPublishedChecker().isFilePublished(fileData.fileData.name(), + return !createPublishedChecker().isFilePublished(fileData.fileData.name(), fileData.fileData.messageMetaData().changeIdentifier(), fileData.context); - return result; } catch (DatafileTaskException e) { logger.error("Cannot check if a file {} is published", fileData.fileData.name(), e); return true; // Publish it then |