aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/main/java')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java46
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java25
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java18
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java8
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java47
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java51
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java13
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java11
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java5
11 files changed, 151 insertions, 79 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index 8e15deb7..e9d84640 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -22,7 +22,6 @@ import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import com.google.gson.TypeAdapterFactory;
-
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
@@ -32,10 +31,8 @@ import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import java.util.ServiceLoader;
-
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
-
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
@@ -49,7 +46,6 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.stereotype.Component;
-
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -69,12 +65,12 @@ public class AppConfig {
private static final Logger logger = LoggerFactory.getLogger(AppConfig.class);
private ConsumerConfiguration dmaapConsumerConfiguration;
- private Map<String, PublisherConfiguration> publishingConfiguration;
+ Map<String, PublisherConfiguration> publishingConfigurations;
private FtpesConfig ftpesConfiguration;
private CloudConfigurationProvider cloudConfigurationProvider;
@Value("#{systemEnvironment}")
Properties systemEnvironment;
- private Disposable refreshConfigTask = null;
+ Disposable refreshConfigTask = null;
@NotEmpty
private String filepath;
@@ -104,6 +100,9 @@ public class AppConfig {
() -> logger.error("Configuration refresh terminated"));
}
+ /**
+ * Stops the refreshing of the configuration.
+ */
public void stop() {
if (refreshConfigTask != null) {
refreshConfigTask.dispose();
@@ -115,17 +114,33 @@ public class AppConfig {
return dmaapConsumerConfiguration;
}
+ /**
+ * Checks if there is a configuration for the given feed.
+ *
+ * @param changeIdentifier the change identifier the feed is configured to belong to.
+ *
+ * @return true if a feed is configured for the given change identifier, false if not.
+ */
public synchronized boolean isFeedConfigured(String changeIdentifier) {
- return publishingConfiguration.containsKey(changeIdentifier);
+ return publishingConfigurations.containsKey(changeIdentifier);
}
+ /**
+ * Gets the feed configuration for the given change identifier.
+ *
+ * @param changeIdentifier the change identifier the feed is configured to belong to.
+ * @return the <code>PublisherConfiguration</code> for the feed belonging to the given change identifier.
+ *
+ * @throws DatafileTaskException if no configuration has been loaded or the configuration is missing for the given
+ * change identifier.
+ */
public synchronized PublisherConfiguration getPublisherConfiguration(String changeIdentifier)
throws DatafileTaskException {
- if (publishingConfiguration == null) {
+ if (publishingConfigurations == null) {
throw new DatafileTaskException("No PublishingConfiguration loaded, changeIdentifier: " + changeIdentifier);
}
- PublisherConfiguration cfg = publishingConfiguration.get(changeIdentifier);
+ PublisherConfiguration cfg = publishingConfigurations.get(changeIdentifier);
if (cfg == null) {
throw new DatafileTaskException(
"Cannot find getPublishingConfiguration for changeIdentifier: " + changeIdentifier);
@@ -184,7 +199,7 @@ public class AppConfig {
private AppConfig parseCloudConfig(JsonObject serviceConfigRootObject, JsonObject dmaapConfigRootObject) {
try {
CloudConfigParser parser = new CloudConfigParser(serviceConfigRootObject, dmaapConfigRootObject);
- setConfiguration(parser.getDmaapConsumerConfig(), parser.getDmaapPublisherConfig(),
+ setConfiguration(parser.getDmaapConsumerConfig(), parser.getDmaapPublisherConfigurations(),
parser.getFtpesConfig());
} catch (DatafileTaskException e) {
logger.error("Could not parse configuration {}", e.toString(), e);
@@ -212,14 +227,13 @@ public class AppConfig {
}
private synchronized void setConfiguration(ConsumerConfiguration consumerConfiguration,
- Map<String, PublisherConfiguration> publisherConfiguration, FtpesConfig ftpesConfig) {
- if (consumerConfiguration == null || publisherConfiguration == null || ftpesConfig == null) {
- logger.error(
- "Problem with configuration consumerConfiguration: {}, publisherConfiguration: {}, ftpesConfig: {}",
- consumerConfiguration, publisherConfiguration, ftpesConfig);
+ Map<String, PublisherConfiguration> publisherConfigurations, FtpesConfig ftpesConfig) {
+ if (consumerConfiguration == null || publisherConfigurations == null || ftpesConfig == null) {
+ logger.error("Problem with consumerConfiguration: {}, publisherConfigurations: {}, ftpesConfig: {}",
+ consumerConfiguration, publisherConfigurations, ftpesConfig);
} else {
this.dmaapConsumerConfiguration = consumerConfiguration;
- this.publishingConfiguration = publisherConfiguration;
+ this.publishingConfigurations = publisherConfigurations;
this.ftpesConfiguration = ftpesConfig;
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
index d25d7db1..0242bef7 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,13 +21,11 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
-
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
/**
@@ -51,7 +49,14 @@ public class CloudConfigParser {
this.dmaapConfigurationRoot = dmaapConfigurationRoot;
}
- public Map<String, PublisherConfiguration> getDmaapPublisherConfig() throws DatafileTaskException {
+ /**
+ * Get the publisher configurations.
+ *
+ * @return a map with change identifier as key and the connected publisher configuration as value.
+ *
+ * @throws DatafileTaskException if a member of the configuration is missing.
+ */
+ public Map<String, PublisherConfiguration> getDmaapPublisherConfigurations() throws DatafileTaskException {
Iterator<JsonElement> producerCfgs =
toArray(serviceConfigurationRoot.get("dmaap.dmaapProducerConfiguration")).iterator();
@@ -81,6 +86,12 @@ public class CloudConfigParser {
return result;
}
+ /**
+ * Get the consumer configuration.
+ *
+ * @return the consumer configuration.
+ * @throws DatafileTaskException if a member of the configuration is missing.
+ */
public ConsumerConfiguration getDmaapConsumerConfig() throws DatafileTaskException {
JsonObject consumerCfg = serviceConfigurationRoot.get("streams_subscribes").getAsJsonObject();
Set<Entry<String, JsonElement>> topics = consumerCfg.entrySet();
@@ -100,6 +111,12 @@ public class CloudConfigParser {
.build();
}
+ /**
+ * Get the security configuration for communication with the xNF.
+ *
+ * @return the xNF communication security configuration.
+ * @throws DatafileTaskException if a member of the configuration is missing.
+ */
public FtpesConfig getFtpesConfig() throws DatafileTaskException {
return new ImmutableFtpesConfig.Builder() //
.keyCert(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyCert"))
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java
index 1fd24e98..e62a11e0 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java
@@ -18,7 +18,6 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
import java.net.MalformedURLException;
import java.net.URL;
-
import org.immutables.gson.Gson;
import org.immutables.value.Value;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
@@ -42,6 +41,13 @@ public abstract class ConsumerConfiguration {
public abstract Boolean enableDmaapCertAuth();
+ /**
+ * Gets the configuration in the SDK version.
+ *
+ * @return a <code>DmaapConsumerConfiguration</code> representing the configuration.
+ *
+ * @throws DatafileTaskException if something is wrong with the topic URL.
+ */
public DmaapConsumerConfiguration toDmaap() throws DatafileTaskException {
try {
URL url = new URL(topicUrl());
@@ -91,16 +97,14 @@ public abstract class ConsumerConfiguration {
}
private DmaapConsumerUrlPath parseDmaapUrlPath(String urlPath) throws DatafileTaskException {
- String[] tokens = urlPath.split("/"); // UrlPath:
- // /events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12
+ String[] tokens = urlPath.split("/"); // /events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12
if (tokens.length != 5) {
throw new DatafileTaskException("The path has incorrect syntax: " + urlPath);
}
- final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // e.g.
- // /events/unauthenticated.VES_NOTIFICATION_OUTPUT
- final String consumerGroup = tokens[3]; // ex. OpenDcae-c12
- final String consumerId = tokens[4]; // ex. C12
+ final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // /events/unauthenticated.VES_NOTIFICATION_OUTPUT
+ final String consumerGroup = tokens[3]; // OpenDcae-c12
+ final String consumerId = tokens[4]; // C12
return new DmaapConsumerUrlPath(dmaapTopicName, consumerGroup, consumerId);
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java
index 3c3a7625..7a845246 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java
@@ -18,7 +18,6 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
import java.net.MalformedURLException;
import java.net.URL;
-
import org.immutables.gson.Gson;
import org.immutables.value.Value;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
@@ -50,6 +49,12 @@ public interface PublisherConfiguration {
String changeIdentifier();
+ /**
+ * Get the publisher configuration in SDK format.
+ *
+ * @return a <code>DmaapPublisherConfiguration</code> contining the publisher configuration.
+ * @throws MalformedURLException if the publish URL is malformed.
+ */
default DmaapPublisherConfiguration toDmaap() throws MalformedURLException {
URL url = new URL(publishUrl());
String urlPath = url.getPath();
@@ -69,5 +74,4 @@ public interface PublisherConfiguration {
.enableDmaapCertAuth(this.enableDmaapCertAuth()) //
.build();
}
-
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
index e11cd76b..bdedba4e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
@@ -22,10 +22,8 @@ import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import com.jcraft.jsch.SftpException;
-
import java.nio.file.Path;
import java.util.Optional;
-
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
import org.slf4j.Logger;
@@ -97,7 +95,7 @@ public class SftpClient implements FileCollectClient {
throw new DatafileTaskException("Could not open Sftp client. " + e);
} else {
throw new NonRetryableDatafileTaskException(
- "Could not open Sftp client, no retry attempts will be done " + e);
+ "Could not open Sftp client, no retry attempts will be done. " + e);
}
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java
index 4c42284e..878bb554 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java
@@ -24,7 +24,6 @@ import java.time.Instant;
import java.util.concurrent.atomic.AtomicInteger;
/**
- *
* Various counters that can be shown via a REST API.
*
*/
@@ -80,6 +79,7 @@ public class Counters {
noOfFailedPublish++;
}
+ @Override
public synchronized String toString() {
StringBuilder str = new StringBuilder();
str.append(format("totalReceivedEvents", totalReceivedEvents));
@@ -104,4 +104,49 @@ public class Counters {
String header = name + ":";
return String.format("%-24s%-22s\n", header, value);
}
+
+ public int getNoOfCollectedFiles() {
+ return noOfCollectedFiles;
+ }
+
+ public int getNoOfFailedFtpAttempts() {
+ return noOfFailedFtpAttempts;
+ }
+
+ public int getNoOfFailedFtp() {
+ return noOfFailedFtp;
+ }
+
+ public int getNoOfFailedPublishAttempts() {
+ return noOfFailedPublishAttempts;
+ }
+
+ public int getTotalPublishedFiles() {
+ return totalPublishedFiles;
+ }
+
+ public int getNoOfFailedPublish() {
+ return noOfFailedPublish;
+ }
+
+ public int getTotalReceivedEvents() {
+ return totalReceivedEvents;
+ }
+
+ /**
+ * Resets all data.
+ */
+ public void clear() {
+ numberOfTasks.set(0);
+ numberOfSubscriptions.set(0);
+ noOfCollectedFiles = 0;
+ noOfFailedFtpAttempts = 0;
+ noOfFailedFtp = 0;
+ noOfFailedPublishAttempts = 0;
+ totalPublishedFiles = 0;
+ noOfFailedPublish = 0;
+ lastPublishedTime = Instant.MIN;
+ totalReceivedEvents = 0;
+ lastEventTime = Instant.MIN;
+ }
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java
index 2ad96e8d..5a8806b4 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java
@@ -18,13 +18,14 @@ package org.onap.dcaegen2.collectors.datafile.service;
import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE;
import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.SERVICE_NAME;
-import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapCustomConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.http.HttpHeaders;
+import org.springframework.web.reactive.function.client.ClientRequest;
+import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClient.Builder;
@@ -37,11 +38,9 @@ import reactor.core.publisher.Mono;
*/
public class DmaapWebClient {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
+ private static final Logger logger = LoggerFactory.getLogger(DmaapWebClient.class);
private String contentType;
- private String dmaapUserName;
- private String dmaapUserPassword;
/**
* Creating DmaapReactiveWebClient passing to them basic DmaapConfig.
@@ -62,33 +61,31 @@ public class DmaapWebClient {
public WebClient build() {
Builder webClientBuilder = WebClient.builder() //
.defaultHeader(HttpHeaders.CONTENT_TYPE, contentType) //
- .filter(logRequest()) //
- .filter(logResponse());
- if (dmaapUserName != null && !dmaapUserName.isEmpty() && dmaapUserPassword != null
- && !dmaapUserPassword.isEmpty()) {
- webClientBuilder.filter(basicAuthentication(dmaapUserName, dmaapUserPassword));
- }
+ .filter(getRequestFilter()) //
+ .filter(getResponseFilter());
return webClientBuilder.build();
}
- private ExchangeFilterFunction logResponse() {
- return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
- MDC.put(RESPONSE_CODE, String.valueOf(clientResponse.statusCode()));
- logger.trace("Response Status {}", clientResponse.statusCode());
- MDC.remove(RESPONSE_CODE);
- return Mono.just(clientResponse);
- });
+ private ExchangeFilterFunction getResponseFilter() {
+ return ExchangeFilterFunction.ofResponseProcessor(this::logResponse);
}
- private ExchangeFilterFunction logRequest() {
- return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
- MDC.put(SERVICE_NAME, String.valueOf(clientRequest.url()));
- logger.trace("Request: {} {}", clientRequest.method(), clientRequest.url());
- clientRequest.headers()
- .forEach((name, values) -> values.forEach(value -> logger.trace("{}={}", name, value)));
- logger.trace("HTTP request headers: {}", clientRequest.headers());
- MDC.remove(SERVICE_NAME);
- return Mono.just(clientRequest);
- });
+ Mono<ClientResponse> logResponse(ClientResponse clientResponse) {
+ MDC.put(RESPONSE_CODE, String.valueOf(clientResponse.statusCode()));
+ logger.trace("Response Status {}", clientResponse.statusCode());
+ MDC.remove(RESPONSE_CODE);
+ return Mono.just(clientResponse);
+ }
+
+ private ExchangeFilterFunction getRequestFilter() {
+ return ExchangeFilterFunction.ofRequestProcessor(this::logRequest);
+ }
+
+ Mono<ClientRequest> logRequest(ClientRequest clientRequest) {
+ MDC.put(SERVICE_NAME, String.valueOf(clientRequest.url()));
+ logger.trace("Request: {} {}", clientRequest.method(), clientRequest.url());
+ logger.trace("HTTP request headers: {}", clientRequest.headers());
+ MDC.remove(SERVICE_NAME);
+ return Mono.just(clientRequest);
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
index 91b74042..a14bfd89 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
@@ -27,7 +27,7 @@ import java.util.Map;
* the key was last used.
*/
public class PublishedFileCache {
- private final Map<Path, Instant> publishedFiles = new HashMap<Path, Instant>();
+ private final Map<Path, Instant> publishedFiles = new HashMap<>();
/**
* Adds a file to the cache.
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index bfd3f3e3..bdec7199 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -22,14 +22,11 @@ package org.onap.dcaegen2.collectors.datafile.tasks;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
-
import java.io.File;
-import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
-
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ContentType;
@@ -103,7 +100,6 @@ public class DataRouterPublisher {
HttpResponse response =
dmaapProducerHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext());
logger.trace("{}", response);
- counters.incTotalPublishedFiles();
return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
} catch (Exception e) {
counters.incNoOfFailedPublishAttempts();
@@ -127,26 +123,27 @@ public class DataRouterPublisher {
MappedDiagnosticContext.appendTraceInfo(put);
}
- private void prepareBody(FilePublishInformation publishInfo, HttpPut put) throws IOException {
+ private void prepareBody(FilePublishInformation publishInfo, HttpPut put) {
File file = createInputFile(publishInfo.getInternalLocation());
FileEntity entity = new FileEntity(file, ContentType.DEFAULT_BINARY);
put.setEntity(entity);
}
- private static Mono<FilePublishInformation> handleHttpResponse(HttpStatus response,
- FilePublishInformation publishInfo) {
+ private Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation publishInfo) {
MDC.setContextMap(publishInfo.getContext());
if (HttpUtils.isSuccessfulResponseCode(response.value())) {
+ counters.incTotalPublishedFiles();
logger.trace("Publishing file {} to DR successful!", publishInfo.getName());
return Mono.just(publishInfo);
} else {
+ counters.incNoOfFailedPublishAttempts();
logger.warn("Publishing file {} to DR unsuccessful. Response code: {}", publishInfo.getName(), response);
return Mono.error(new Exception(
"Publishing file " + publishInfo.getName() + " to DR unsuccessful. Response code: " + response));
}
}
- File createInputFile(Path filePath) throws IOException {
+ File createInputFile(Path filePath) {
FileSystemResource realResource = new FileSystemResource(filePath);
return realResource.getFile();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index 20bf599b..1ce64e41 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -21,7 +21,6 @@ import java.nio.file.Paths;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
-
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
@@ -104,15 +103,15 @@ public class FileCollector {
currentClient.collectFile(remoteFile, localFile);
counters.incNoOfCollectedFiles();
return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context)));
+ } catch (NonRetryableDatafileTaskException nre) {
+ logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(), nre);
+ counters.incNoOfFailedFtpAttempts();
+ return Mono.just(Optional.empty()); // Give up
} catch (DatafileTaskException e) {
logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
e.toString());
counters.incNoOfFailedFtpAttempts();
- if (e instanceof NonRetryableDatafileTaskException) {
- return Mono.just(Optional.empty()); // Give up
- } else {
- return Mono.error(e);
- }
+ return Mono.error(e);
} catch (Exception throwable) {
logger.warn("Failed to close ftp client: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
throwable.toString(), throwable);
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index bc73ddb2..26353e38 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -22,7 +22,6 @@ import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.Counters;
@@ -36,7 +35,6 @@ import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
@@ -222,9 +220,8 @@ public class ScheduledTasks {
Path localFilePath = fileData.fileData.getLocalFilePath();
if (publishedFilesCache.put(localFilePath) == null) {
try {
- boolean result = !createPublishedChecker().isFilePublished(fileData.fileData.name(),
+ return !createPublishedChecker().isFilePublished(fileData.fileData.name(),
fileData.fileData.messageMetaData().changeIdentifier(), fileData.context);
- return result;
} catch (DatafileTaskException e) {
logger.error("Cannot check if a file {} is published", fileData.fileData.name(), e);
return true; // Publish it then