summaryrefslogtreecommitdiffstats
path: root/datafile-app-server
diff options
context:
space:
mode:
authorPatrikBuhr <patrik.buhr@est.tech>2019-06-14 06:38:21 +0000
committerPatrikBuhr <patrik.buhr@est.tech>2019-06-14 06:38:21 +0000
commitdafd553cf1694585b35fd7132c6bafdef1e98ed6 (patch)
tree7882694255ee951de06e9e3060af508c9e184385 /datafile-app-server
parentaddf3f10dd0cc1d0797151633839c65276af8b1c (diff)
Bugfix, improved behaviour for large files
Previously files was read into a buffer for publishing. This does not work when files are bigger than the available memory. After the fix , files are streamed instead. Implemented a new REST primitive for exposing status and statistics. To be used for test and trouble shooting. Change-Id: Iab5a1ee9ffcbf6836fcf709d115bf25ab0391732 Issue-ID: DCAEGEN2-1532 Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Diffstat (limited to 'datafile-app-server')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusController.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java)54
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java16
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/NonRetryableDatafileTaskException.java34
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java15
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java108
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java24
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java14
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java50
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controller/StatusControllerTest.java (renamed from datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controller/HeartbeatControllerTest.java)37
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java3
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java8
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java12
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java18
14 files changed, 300 insertions, 97 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusController.java
index c6d56c42..40f9d99b 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusController.java
@@ -19,6 +19,8 @@ package org.onap.dcaegen2.collectors.datafile.controllers;
import static org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext.ENTRY;
import static org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext.EXIT;
+import org.onap.dcaegen2.collectors.datafile.model.Counters;
+
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
@@ -37,28 +39,25 @@ import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
/**
- * Controller to check the heartbeat of DFC.
- *
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/19/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ * REST Controller to check the heart beat and status of the DFC.
*/
@RestController
-@Api(value = "HeartbeatController")
-public class HeartbeatController {
+@Api(value = "StatusController")
+public class StatusController {
- private static final Logger logger = LoggerFactory.getLogger(HeartbeatController.class);
+ private static final Logger logger = LoggerFactory.getLogger(StatusController.class);
private final ScheduledTasks scheduledTasks;
@Autowired
- public HeartbeatController(ScheduledTasks scheduledTasks) {
+ public StatusController(ScheduledTasks scheduledTasks) {
this.scheduledTasks = scheduledTasks;
}
/**
- * Checks the heartbeat of DFC.
+ * Checks the heart beat of DFC.
*
- * @return the heartbeat status of DFC.
+ * @return the heart beat status of DFC.
*/
@GetMapping("/heartbeat")
@ApiOperation(value = "Returns liveness of DATAFILE service")
@@ -66,18 +65,41 @@ public class HeartbeatController {
@ApiResponse(code = 200, message = "DATAFILE service is living"),
@ApiResponse(code = 401, message = "You are not authorized to view the resource"),
@ApiResponse(code = 403, message = "Accessing the resource you were trying to reach is forbidden"),
- @ApiResponse(code = 404, message = "The resource you were trying to reach is not found") })
+ @ApiResponse(code = 404, message = "The resource you were trying to reach is not found")})
public Mono<ResponseEntity<String>> heartbeat(@RequestHeader HttpHeaders headers) {
MappedDiagnosticContext.initializeTraceContext(headers);
logger.info(ENTRY, "Heartbeat request");
- StringBuilder statusString = new StringBuilder("I'm living! Status: ");
- statusString.append("numberOfFileCollectionTasks=").append(scheduledTasks.getCurrentNumberOfTasks())
- .append(",");
- statusString.append("fileCacheSize=").append(scheduledTasks.publishedFilesCacheSize());
+ String statusString = "I'm living!";
- Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>(statusString.toString(), HttpStatus.OK));
+ Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>(statusString, HttpStatus.OK));
logger.info(EXIT, "Heartbeat request");
return response;
}
+
+ /**
+ * Returns diagnostics and statistics information. It is intended for testing and trouble
+ * shooting.
+ *
+ * @return information.
+ */
+ @GetMapping("/status")
+ @ApiOperation(value = "Returns status and statistics of DATAFILE service")
+ @ApiResponses(value = { //
+ @ApiResponse(code = 200, message = "DATAFILE service is living"),
+ @ApiResponse(code = 401, message = "You are not authorized to view the resource"),
+ @ApiResponse(code = 403, message = "Accessing the resource you were trying to reach is forbidden"),
+ @ApiResponse(code = 404, message = "The resource you were trying to reach is not found")})
+ public Mono<ResponseEntity<String>> status(@RequestHeader HttpHeaders headers) {
+ MappedDiagnosticContext.initializeTraceContext(headers);
+ logger.info(ENTRY, "Status request");
+
+ Counters counters = scheduledTasks.getCounters();
+ Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>(counters.toString(), HttpStatus.OK));
+ logger.info(EXIT, "Status request");
+ return response;
+ }
+
+
+
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
index 38f74ed1..6aa76157 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
@@ -21,29 +21,13 @@ package org.onap.dcaegen2.collectors.datafile.exceptions;
public class DatafileTaskException extends Exception {
private static final long serialVersionUID = 1L;
- private final boolean isRetryable;
public DatafileTaskException(String message) {
super(message);
- isRetryable = true;
- }
-
- public DatafileTaskException(String message, boolean retry) {
- super(message);
- isRetryable = retry;
}
public DatafileTaskException(String message, Exception originalException) {
super(message, originalException);
- isRetryable = true;
}
- public DatafileTaskException(String message, Exception originalException, boolean retry) {
- super(message, originalException);
- isRetryable = retry;
- }
-
- public boolean isRetryable() {
- return this.isRetryable;
- }
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/NonRetryableDatafileTaskException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/NonRetryableDatafileTaskException.java
new file mode 100644
index 00000000..a4bdd667
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/NonRetryableDatafileTaskException.java
@@ -0,0 +1,34 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.exceptions;
+
+
+public class NonRetryableDatafileTaskException extends DatafileTaskException {
+
+ private static final long serialVersionUID = 1L;
+
+ public NonRetryableDatafileTaskException(String message) {
+ super(message);
+ }
+
+ public NonRetryableDatafileTaskException(String message, Exception originalException) {
+ super(message, originalException);
+ }
+
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
index bb3016ce..18288603 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
@@ -38,6 +38,7 @@ import org.apache.commons.net.ftp.FTPReply;
import org.apache.commons.net.ftp.FTPSClient;
import org.apache.commons.net.util.KeyManagerUtils;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.FileSystemResource;
@@ -119,8 +120,7 @@ public class FtpsClient implements FileCollectClient {
try (OutputStream output = createOutputStream(localFileName)) {
logger.trace("begin to retrieve from xNF.");
if (!realFtpsClient.retrieveFile(remoteFileName, output)) {
- final boolean retry = false; // Skip retrying for all problems except IOException
- throw new DatafileTaskException("Could not retrieve file " + remoteFileName, retry);
+ throw new NonRetryableDatafileTaskException("Could not retrieve file. No retry attempts will be done, file :" + remoteFileName);
}
} catch (IOException e) {
throw new DatafileTaskException("Could not fetch file: " + e, e);
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
index ec523354..f5d10d34 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
@@ -25,6 +25,7 @@ import com.jcraft.jsch.SftpException;
import java.nio.file.Path;
import java.util.Optional;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,7 +58,12 @@ public class SftpClient implements FileCollectClient {
} catch (SftpException e) {
boolean retry = e.id != ChannelSftp.SSH_FX_NO_SUCH_FILE && e.id != ChannelSftp.SSH_FX_PERMISSION_DENIED
&& e.id != ChannelSftp.SSH_FX_OP_UNSUPPORTED;
- throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e, retry);
+ if (retry) {
+ throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e);
+ } else {
+ throw new NonRetryableDatafileTaskException(
+ "Unable to get file from xNF. No retry attempts will be done. Data: " + fileServerData, e);
+ }
}
logger.trace("collectFile OK");
@@ -85,7 +91,12 @@ public class SftpClient implements FileCollectClient {
}
} catch (JSchException e) {
boolean retry = !e.getMessage().contains("Auth fail");
- throw new DatafileTaskException("Could not open Sftp client. " + e, e, retry);
+ if (retry) {
+ throw new DatafileTaskException("Could not open Sftp client. " + e);
+ } else {
+ throw new NonRetryableDatafileTaskException(
+ "Could not open Sftp client, no retry attempts will be done " + e);
+ }
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java
new file mode 100644
index 00000000..5efbe37a
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java
@@ -0,0 +1,108 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.model;
+
+import java.time.Instant;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ *
+ * Various counters that can be shown via a REST API.
+ *
+ */
+public class Counters {
+
+ private final AtomicInteger numberOfTasks = new AtomicInteger();
+ private final AtomicInteger numberOfSubscriptions = new AtomicInteger();
+ private int noOfCollectedFiles = 0;
+ private int noOfFailedFtpAttempts = 0;
+ private int noOfFailedFtp = 0;
+ private int noOfFailedPublishAttempts = 0;
+ private int totalPublishedFiles = 0;
+ private int noOfFailedPublish = 0;
+ private Instant lastPublishedTime = Instant.MIN;
+ private int totalReceivedEvents = 0;
+ private Instant lastEventTime = Instant.MIN;
+
+ public AtomicInteger getCurrentNumberOfTasks() {
+ return numberOfTasks;
+ }
+
+ public AtomicInteger getCurrentNumberOfSubscriptions() {
+ return numberOfSubscriptions;
+ }
+
+ public synchronized void incNoOfReceivedEvents() {
+ totalReceivedEvents++;
+ lastEventTime = Instant.now();
+ }
+
+ public synchronized void incNoOfCollectedFiles() {
+ noOfCollectedFiles++;
+ }
+
+ public synchronized void incNoOfFailedFtpAttempts() {
+ noOfFailedFtpAttempts++;
+ }
+
+ public synchronized void incNoOfFailedFtp() {
+ noOfFailedFtp++;
+ }
+
+ public synchronized void incNoOfFailedPublishAttempts() {
+ noOfFailedPublishAttempts++;
+ }
+
+ public synchronized void incTotalPublishedFiles() {
+ totalPublishedFiles++;
+ lastPublishedTime = Instant.now();
+ }
+
+ public synchronized void incNoOfFailedPublish() {
+ noOfFailedPublish++;
+ }
+
+ public synchronized String toString() {
+ StringBuilder str = new StringBuilder();
+ str.append(format("totalReceivedEvents", totalReceivedEvents));
+ str.append(format("lastEventTime", lastEventTime));
+ str.append(format("numberOfTasks", numberOfTasks));
+ str.append(format("numberOfSubscriptions", numberOfSubscriptions));
+ str.append("\n");
+ str.append(format("collectedFiles", noOfCollectedFiles));
+ str.append(format("failedFtpAttempts", noOfFailedFtpAttempts));
+ str.append(format("failedFtp", noOfFailedFtp));
+ str.append("\n");
+ str.append(format("totalPublishedFiles", totalPublishedFiles));
+ str.append(format("lastPublishedTime", lastPublishedTime));
+
+ str.append(format("failedPublishAttempts", noOfFailedPublishAttempts));
+ str.append(format("noOfFailedPublish", noOfFailedPublish));
+
+ return str.toString();
+ }
+
+ private String format(String name, Object value) {
+ String header = name + ":";
+ return String.format("%-24s%-22s\n", header, value);
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index d9efe802..02e153cf 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -22,19 +22,20 @@ package org.onap.dcaegen2.collectors.datafile.tasks;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
+import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
-import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPut;
-import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.FileEntity;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.model.Counters;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.JsonSerializer;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
@@ -62,9 +63,11 @@ public class DataRouterPublisher {
private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class);
private final AppConfig datafileAppConfig;
+ private final Counters counters;
- public DataRouterPublisher(AppConfig datafileAppConfig) {
+ public DataRouterPublisher(AppConfig datafileAppConfig, Counters counters) {
this.datafileAppConfig = datafileAppConfig;
+ this.counters = counters;
}
/**
@@ -98,8 +101,10 @@ public class DataRouterPublisher {
HttpResponse response =
dmaapProducerHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext());
logger.trace("{}", response);
+ counters.incTotalPublishedFiles();
return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
} catch (Exception e) {
+ counters.incNoOfFailedPublishAttempts();
logger.warn("Publishing file {} to DR unsuccessful.", publishInfo.getName(), e);
return Mono.error(e);
}
@@ -121,10 +126,9 @@ public class DataRouterPublisher {
}
private void prepareBody(FilePublishInformation publishInfo, HttpPut put) throws IOException {
- Path fileLocation = publishInfo.getInternalLocation();
- try (InputStream fileInputStream = createInputStream(fileLocation)) {
- put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
- }
+ File file = createInputFile(publishInfo.getInternalLocation());
+ FileEntity entity = new FileEntity(file, ContentType.DEFAULT_BINARY);
+ put.setEntity(entity);
}
private static Mono<FilePublishInformation> handleHttpResponse(HttpStatus response,
@@ -140,9 +144,9 @@ public class DataRouterPublisher {
}
}
- InputStream createInputStream(Path filePath) throws IOException {
+ File createInputFile(Path filePath) throws IOException {
FileSystemResource realResource = new FileSystemResource(filePath);
- return realResource.getInputStream();
+ return realResource.getFile();
}
PublisherConfiguration resolveConfiguration(String changeIdentifer) throws DatafileTaskException {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index aeacaffc..311f752d 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -24,9 +24,11 @@ import java.util.Optional;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient;
import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
+import org.onap.dcaegen2.collectors.datafile.model.Counters;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
@@ -45,14 +47,16 @@ public class FileCollector {
private static final Logger logger = LoggerFactory.getLogger(FileCollector.class);
private final AppConfig datafileAppConfig;
+ private final Counters counters;
/**
* Constructor.
*
* @param datafileAppConfig application configuration
*/
- public FileCollector(AppConfig datafileAppConfig) {
+ public FileCollector(AppConfig datafileAppConfig, Counters counters) {
this.datafileAppConfig = datafileAppConfig;
+ this.counters = counters;
}
/**
@@ -97,14 +101,16 @@ public class FileCollector {
currentClient.open();
localFile.getParent().toFile().mkdir(); // Create parent directories
currentClient.collectFile(remoteFile, localFile);
+ counters.incNoOfCollectedFiles();
return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context)));
} catch (DatafileTaskException e) {
logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
e.toString());
- if (e.isRetryable()) {
- return Mono.error(e);
- } else {
+ counters.incNoOfFailedFtpAttempts();
+ if (e instanceof NonRetryableDatafileTaskException) {
return Mono.just(Optional.empty()); // Give up
+ } else {
+ return Mono.error(e);
}
} catch (Exception throwable) {
logger.warn("Failed to close ftp client: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 99b2d918..300ca601 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.model.Counters;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
@@ -41,8 +42,8 @@ import reactor.core.scheduler.Schedulers;
/**
- * This implements the main flow of the data file collector. Fetch file ready events from the message router, fetch new
- * files from the PNF publish these in the data router.
+ * This implements the main flow of the data file collector. Fetch file ready events from the
+ * message router, fetch new files from the PNF publish these in the data router.
*/
@Component
public class ScheduledTasks {
@@ -57,11 +58,12 @@ public class ScheduledTasks {
private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
private final AppConfig applicationConfiguration;
- private final AtomicInteger currentNumberOfTasks = new AtomicInteger();
+ private final AtomicInteger currentNumberOfTasks;
private final AtomicInteger threadPoolQueueSize = new AtomicInteger();
- private final AtomicInteger currentNumberOfSubscriptions = new AtomicInteger();
+ private final AtomicInteger currentNumberOfSubscriptions;
private final Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS);
PublishedFileCache publishedFilesCache = new PublishedFileCache();
+ private Counters counters = new Counters();
/**
* Constructor for task registration in Datafile Workflow.
@@ -71,6 +73,8 @@ public class ScheduledTasks {
@Autowired
public ScheduledTasks(AppConfig applicationConfiguration) {
this.applicationConfiguration = applicationConfiguration;
+ this.currentNumberOfTasks = counters.getCurrentNumberOfTasks();
+ this.currentNumberOfSubscriptions = counters.getCurrentNumberOfSubscriptions();
}
/**
@@ -112,6 +116,7 @@ public class ScheduledTasks {
Flux<FilePublishInformation> createMainTask(Map<String, String> context) {
return fetchMoreFileReadyMessages() //
.doOnNext(fileReadyMessage -> threadPoolQueueSize.incrementAndGet()) //
+ .doOnNext(fileReadyMessage -> counters.incNoOfReceivedEvents()) //
.parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread
.runOn(scheduler) //
.doOnNext(fileReadyMessage -> threadPoolQueueSize.decrementAndGet()) //
@@ -148,32 +153,21 @@ public class ScheduledTasks {
return new PublishedChecker(applicationConfiguration);
}
- public int getCurrentNumberOfTasks() {
- return currentNumberOfTasks.get();
+ public Counters getCounters() {
+ return this.counters;
}
- public int publishedFilesCacheSize() {
- return publishedFilesCache.size();
- }
-
- public int getCurrentNumberOfSubscriptions() {
- return currentNumberOfSubscriptions.get();
- }
-
- public int getThreadPoolQueueSize() {
- return this.threadPoolQueueSize.get();
- }
protected DMaaPMessageConsumer createConsumerTask() throws DatafileTaskException {
return new DMaaPMessageConsumer(this.applicationConfiguration);
}
protected FileCollector createFileCollector() {
- return new FileCollector(applicationConfiguration);
+ return new FileCollector(applicationConfiguration, counters);
}
protected DataRouterPublisher createDataRouterPublisher() {
- return new DataRouterPublisher(applicationConfiguration);
+ return new DataRouterPublisher(applicationConfiguration, counters);
}
private static void onComplete(Map<String, String> contextMap) {
@@ -181,6 +175,22 @@ public class ScheduledTasks {
logger.trace("Datafile tasks have been completed");
}
+ int publishedFilesCacheSize() {
+ return publishedFilesCache.size();
+ }
+
+ int getCurrentNumberOfTasks() {
+ return this.currentNumberOfTasks.get();
+ }
+
+ int getCurrentNumberOfSubscriptions() {
+ return this.currentNumberOfSubscriptions.get();
+ }
+
+ int getThreadPoolQueueSize() {
+ return this.threadPoolQueueSize.get();
+ }
+
private static synchronized void onSuccess(FilePublishInformation publishInfo) {
MDC.setContextMap(publishInfo.getContext());
logger.info("Datafile file published {}", publishInfo.getInternalLocation());
@@ -239,6 +249,7 @@ public class ScheduledTasks {
deleteFile(localFilePath, fileData.context);
publishedFilesCache.remove(localFilePath);
currentNumberOfTasks.decrementAndGet();
+ counters.incNoOfFailedFtp();
return Mono.empty();
}
@@ -257,6 +268,7 @@ public class ScheduledTasks {
deleteFile(internalFileName, publishInfo.getContext());
publishedFilesCache.remove(internalFileName);
currentNumberOfTasks.decrementAndGet();
+ counters.incNoOfFailedPublish();
return Mono.empty();
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controller/HeartbeatControllerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controller/StatusControllerTest.java
index 012a6b3d..51097f52 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controller/HeartbeatControllerTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controller/StatusControllerTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -31,7 +32,8 @@ import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.controllers.HeartbeatController;
+import org.onap.dcaegen2.collectors.datafile.controllers.StatusController;
+import org.onap.dcaegen2.collectors.datafile.model.Counters;
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables;
@@ -40,31 +42,48 @@ import org.springframework.http.HttpHeaders;
import org.springframework.http.ResponseEntity;
import reactor.core.publisher.Mono;
-public class HeartbeatControllerTest {
+public class StatusControllerTest {
@Test
public void heartbeat_success() {
ScheduledTasks scheduledTasksMock = mock(ScheduledTasks.class);
- when(scheduledTasksMock.getCurrentNumberOfTasks()).thenReturn(10);
- when(scheduledTasksMock.publishedFilesCacheSize()).thenReturn(20);
HttpHeaders httpHeaders = new HttpHeaders();
- HeartbeatController controllerUnderTest = new HeartbeatController(scheduledTasksMock);
+ StatusController controllerUnderTest = new StatusController(scheduledTasksMock);
- ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(HeartbeatController.class);
+ ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(StatusController.class);
Mono<ResponseEntity<String>> result = controllerUnderTest.heartbeat(httpHeaders);
validateLogging(logAppender);
String body = result.block().getBody();
- assertTrue(body.startsWith("I'm living! Status: "));
- assertTrue(body.contains("numberOfFileCollectionTasks=10"));
- assertTrue(body.contains("fileCacheSize=20"));
+ assertTrue(body.startsWith("I'm living!"));
assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.REQUEST_ID)));
assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.INVOCATION_ID)));
}
+
+ @Test
+ public void status() {
+ ScheduledTasks scheduledTasksMock = mock(ScheduledTasks.class);
+ Counters counters = new Counters();
+ doReturn(counters).when(scheduledTasksMock).getCounters();
+
+ HttpHeaders httpHeaders = new HttpHeaders();
+
+ StatusController controllerUnderTest = new StatusController(scheduledTasksMock);
+
+ Mono<ResponseEntity<String>> result = controllerUnderTest.status(httpHeaders);
+
+ String body = result.block().getBody();
+ System.out.println(body);
+
+ assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.REQUEST_ID)));
+ assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.INVOCATION_ID)));
+ }
+
+
private void validateLogging(ListAppender<ILoggingEvent> logAppender) {
assertEquals(logAppender.list.get(0).getMarker().getName(), "ENTRY");
assertNotNull(logAppender.list.get(0).getMDCPropertyMap().get("InvocationID"));
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java
index e0182560..f4e814f4 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java
@@ -205,7 +205,8 @@ public class FtpsClientTest {
doReturn(false).when(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, outputStreamMock);
assertThatThrownBy(() -> clientUnderTestSpy.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
- .hasMessage("Could not retrieve file /dir/sample.txt");
+ .hasMessageContaining(REMOTE_FILE_PATH)
+ .hasMessageContaining("No retry");
verifyFtpsClientMock_openOk();
verify(ftpsClientMock, times(1)).retrieveFile(ArgumentMatchers.eq(REMOTE_FILE_PATH), any());
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java
index cb3735be..693806c2 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java
@@ -121,8 +121,7 @@ public class SftpClientTest {
doReturn(jschMock).when(sftpClientSpy).createJsch();
when(jschMock.getSession(anyString(), anyString(), anyInt())).thenThrow(new JSchException("Failed"));
- assertThatThrownBy(() -> sftpClientSpy.open())
- .hasMessageStartingWith("Could not open Sftp client. com.jcraft.jsch.JSchException: Failed");
+ assertThatThrownBy(() -> sftpClientSpy.open()).hasMessageStartingWith("Could not open Sftp client.");
}
@SuppressWarnings("resource")
@@ -161,8 +160,9 @@ public class SftpClientTest {
assertThatThrownBy(() -> sftpClient.collectFile("remoteFile", Paths.get("localFile")))
.isInstanceOf(DatafileTaskException.class)
- .hasMessageStartingWith("Unable to get file from xNF. Data: FileServerData{serverAddress=" + HOST
- + ", " + "userId=" + USERNAME + ", password=####, port=" + SFTP_PORT);
+ .hasMessageStartingWith("Unable to get file from xNF. No retry attempts will be done")
+ .hasMessageContaining("Data: FileServerData{serverAddress=" + HOST + ", " + "userId=" + USERNAME
+ + ", password=####, port=" + SFTP_PORT);
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
index 4da22cbf..6a9dccda 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
@@ -27,10 +27,10 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+import java.io.File;
+
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
import java.net.URI;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -50,6 +50,7 @@ import org.mockito.ArgumentCaptor;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.model.Counters;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
@@ -86,7 +87,6 @@ class DataRouterPublisherTest {
private static final String APPLICATION_OCTET_STREAM_CONTENT_TYPE = "application/octet-stream";
private static final String PUBLISH_TOPIC = "publish";
private static final String FEED_ID = "1";
- private static final String FILE_CONTENT = "Just a string.";
private static FilePublishInformation filePublishInformation;
private static DmaapProducerHttpClient httpClientMock;
@@ -120,7 +120,7 @@ class DataRouterPublisherTest {
.changeIdentifier(CHANGE_IDENTIFIER) //
.build(); //
appConfig = mock(AppConfig.class);
- publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig));
+ publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig, new Counters()));
}
@Test
@@ -236,8 +236,8 @@ class DataRouterPublisherTest {
when(httpResponseMock.getStatusLine()).thenReturn(statusLineMock);
when(statusLineMock.getStatusCode()).thenReturn(firstResponse, nextHttpResponses);
- InputStream fileStream = new ByteArrayInputStream(FILE_CONTENT.getBytes());
- doReturn(fileStream).when(publisherTaskUnderTestSpy).createInputStream(Paths.get("target", PM_FILE_NAME));
+ File file = File.createTempFile("DFC", "tmp");
+ doReturn(file).when(publisherTaskUnderTestSpy).createInputFile(Paths.get("target", PM_FILE_NAME));
}
private Map<String, String> getMetaDataAsMap(Header[] metaHeaders) {
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
index 299a0238..99e92bd2 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
@@ -37,9 +37,11 @@ import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
+import org.onap.dcaegen2.collectors.datafile.model.Counters;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
@@ -93,6 +95,7 @@ public class FileCollectorTest {
private SftpClient sftpClientMock = mock(SftpClient.class);
private final Map<String, String> contextMap = new HashMap<>();
+ private final Counters counters = new Counters();
private MessageMetaData createMessageMetaData() {
return ImmutableMessageMetaData.builder() //
@@ -133,7 +136,7 @@ public class FileCollectorTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
- .context(new HashMap<String,String>()) //
+ .context(new HashMap<String, String>()) //
.changeIdentifier(CHANGE_IDENTIFIER) //
.build();
}
@@ -152,7 +155,7 @@ public class FileCollectorTest {
@Test
public void whenFtpesFile_returnCorrectResponse() throws Exception {
- FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock));
+ FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, counters));
doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any());
FileData fileData = createFileData(FTPES_LOCATION_NO_PORT, Scheme.FTPS);
@@ -173,7 +176,7 @@ public class FileCollectorTest {
@Test
public void whenSftpFile_returnCorrectResponse() throws Exception {
- FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock));
+ FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, counters));
doReturn(sftpClientMock).when(collectorUndetTest).createSftpClient(any());
@@ -201,7 +204,7 @@ public class FileCollectorTest {
@Test
public void whenFtpesFileAlwaysFail_retryAndFail() throws Exception {
- FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock));
+ FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, counters));
doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any());
FileData fileData = createFileData(FTPES_LOCATION, Scheme.FTPS);
@@ -217,12 +220,11 @@ public class FileCollectorTest {
@Test
public void whenFtpesFileAlwaysFail_failWithoutRetry() throws Exception {
- FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock));
+ FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, new Counters()));
doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any());
- final boolean retry = false;
FileData fileData = createFileData(FTPES_LOCATION, Scheme.FTPS);
- doThrow(new DatafileTaskException("Unable to collect file.", retry)).when(ftpsClientMock)
+ doThrow(new NonRetryableDatafileTaskException("Unable to collect file.")).when(ftpsClientMock)
.collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0), contextMap))
@@ -234,7 +236,7 @@ public class FileCollectorTest {
@Test
public void whenFtpesFileFailOnce_retryAndReturnCorrectResponse() throws Exception {
- FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock));
+ FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, counters));
doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any());
doThrow(new DatafileTaskException("Unable to collect file.")).doNothing().when(ftpsClientMock)
.collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);