aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main/java/org
diff options
context:
space:
mode:
authorPatrikBuhr <patrik.buhr@est.tech>2019-06-14 06:38:21 +0000
committerPatrikBuhr <patrik.buhr@est.tech>2019-06-14 06:38:21 +0000
commitdafd553cf1694585b35fd7132c6bafdef1e98ed6 (patch)
tree7882694255ee951de06e9e3060af508c9e184385 /datafile-app-server/src/main/java/org
parentaddf3f10dd0cc1d0797151633839c65276af8b1c (diff)
Bugfix, improved behaviour for large files
Previously files was read into a buffer for publishing. This does not work when files are bigger than the available memory. After the fix , files are streamed instead. Implemented a new REST primitive for exposing status and statistics. To be used for test and trouble shooting. Change-Id: Iab5a1ee9ffcbf6836fcf709d115bf25ab0391732 Issue-ID: DCAEGEN2-1532 Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Diffstat (limited to 'datafile-app-server/src/main/java/org')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusController.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java)54
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java16
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/NonRetryableDatafileTaskException.java34
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java15
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java108
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java24
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java14
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java50
9 files changed, 250 insertions, 69 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusController.java
index c6d56c42..40f9d99b 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusController.java
@@ -19,6 +19,8 @@ package org.onap.dcaegen2.collectors.datafile.controllers;
import static org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext.ENTRY;
import static org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext.EXIT;
+import org.onap.dcaegen2.collectors.datafile.model.Counters;
+
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
@@ -37,28 +39,25 @@ import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
/**
- * Controller to check the heartbeat of DFC.
- *
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/19/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ * REST Controller to check the heart beat and status of the DFC.
*/
@RestController
-@Api(value = "HeartbeatController")
-public class HeartbeatController {
+@Api(value = "StatusController")
+public class StatusController {
- private static final Logger logger = LoggerFactory.getLogger(HeartbeatController.class);
+ private static final Logger logger = LoggerFactory.getLogger(StatusController.class);
private final ScheduledTasks scheduledTasks;
@Autowired
- public HeartbeatController(ScheduledTasks scheduledTasks) {
+ public StatusController(ScheduledTasks scheduledTasks) {
this.scheduledTasks = scheduledTasks;
}
/**
- * Checks the heartbeat of DFC.
+ * Checks the heart beat of DFC.
*
- * @return the heartbeat status of DFC.
+ * @return the heart beat status of DFC.
*/
@GetMapping("/heartbeat")
@ApiOperation(value = "Returns liveness of DATAFILE service")
@@ -66,18 +65,41 @@ public class HeartbeatController {
@ApiResponse(code = 200, message = "DATAFILE service is living"),
@ApiResponse(code = 401, message = "You are not authorized to view the resource"),
@ApiResponse(code = 403, message = "Accessing the resource you were trying to reach is forbidden"),
- @ApiResponse(code = 404, message = "The resource you were trying to reach is not found") })
+ @ApiResponse(code = 404, message = "The resource you were trying to reach is not found")})
public Mono<ResponseEntity<String>> heartbeat(@RequestHeader HttpHeaders headers) {
MappedDiagnosticContext.initializeTraceContext(headers);
logger.info(ENTRY, "Heartbeat request");
- StringBuilder statusString = new StringBuilder("I'm living! Status: ");
- statusString.append("numberOfFileCollectionTasks=").append(scheduledTasks.getCurrentNumberOfTasks())
- .append(",");
- statusString.append("fileCacheSize=").append(scheduledTasks.publishedFilesCacheSize());
+ String statusString = "I'm living!";
- Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>(statusString.toString(), HttpStatus.OK));
+ Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>(statusString, HttpStatus.OK));
logger.info(EXIT, "Heartbeat request");
return response;
}
+
+ /**
+ * Returns diagnostics and statistics information. It is intended for testing and trouble
+ * shooting.
+ *
+ * @return information.
+ */
+ @GetMapping("/status")
+ @ApiOperation(value = "Returns status and statistics of DATAFILE service")
+ @ApiResponses(value = { //
+ @ApiResponse(code = 200, message = "DATAFILE service is living"),
+ @ApiResponse(code = 401, message = "You are not authorized to view the resource"),
+ @ApiResponse(code = 403, message = "Accessing the resource you were trying to reach is forbidden"),
+ @ApiResponse(code = 404, message = "The resource you were trying to reach is not found")})
+ public Mono<ResponseEntity<String>> status(@RequestHeader HttpHeaders headers) {
+ MappedDiagnosticContext.initializeTraceContext(headers);
+ logger.info(ENTRY, "Status request");
+
+ Counters counters = scheduledTasks.getCounters();
+ Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>(counters.toString(), HttpStatus.OK));
+ logger.info(EXIT, "Status request");
+ return response;
+ }
+
+
+
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
index 38f74ed1..6aa76157 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
@@ -21,29 +21,13 @@ package org.onap.dcaegen2.collectors.datafile.exceptions;
public class DatafileTaskException extends Exception {
private static final long serialVersionUID = 1L;
- private final boolean isRetryable;
public DatafileTaskException(String message) {
super(message);
- isRetryable = true;
- }
-
- public DatafileTaskException(String message, boolean retry) {
- super(message);
- isRetryable = retry;
}
public DatafileTaskException(String message, Exception originalException) {
super(message, originalException);
- isRetryable = true;
}
- public DatafileTaskException(String message, Exception originalException, boolean retry) {
- super(message, originalException);
- isRetryable = retry;
- }
-
- public boolean isRetryable() {
- return this.isRetryable;
- }
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/NonRetryableDatafileTaskException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/NonRetryableDatafileTaskException.java
new file mode 100644
index 00000000..a4bdd667
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/NonRetryableDatafileTaskException.java
@@ -0,0 +1,34 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.exceptions;
+
+
+public class NonRetryableDatafileTaskException extends DatafileTaskException {
+
+ private static final long serialVersionUID = 1L;
+
+ public NonRetryableDatafileTaskException(String message) {
+ super(message);
+ }
+
+ public NonRetryableDatafileTaskException(String message, Exception originalException) {
+ super(message, originalException);
+ }
+
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
index bb3016ce..18288603 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
@@ -38,6 +38,7 @@ import org.apache.commons.net.ftp.FTPReply;
import org.apache.commons.net.ftp.FTPSClient;
import org.apache.commons.net.util.KeyManagerUtils;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.FileSystemResource;
@@ -119,8 +120,7 @@ public class FtpsClient implements FileCollectClient {
try (OutputStream output = createOutputStream(localFileName)) {
logger.trace("begin to retrieve from xNF.");
if (!realFtpsClient.retrieveFile(remoteFileName, output)) {
- final boolean retry = false; // Skip retrying for all problems except IOException
- throw new DatafileTaskException("Could not retrieve file " + remoteFileName, retry);
+ throw new NonRetryableDatafileTaskException("Could not retrieve file. No retry attempts will be done, file :" + remoteFileName);
}
} catch (IOException e) {
throw new DatafileTaskException("Could not fetch file: " + e, e);
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
index ec523354..f5d10d34 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
@@ -25,6 +25,7 @@ import com.jcraft.jsch.SftpException;
import java.nio.file.Path;
import java.util.Optional;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,7 +58,12 @@ public class SftpClient implements FileCollectClient {
} catch (SftpException e) {
boolean retry = e.id != ChannelSftp.SSH_FX_NO_SUCH_FILE && e.id != ChannelSftp.SSH_FX_PERMISSION_DENIED
&& e.id != ChannelSftp.SSH_FX_OP_UNSUPPORTED;
- throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e, retry);
+ if (retry) {
+ throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e);
+ } else {
+ throw new NonRetryableDatafileTaskException(
+ "Unable to get file from xNF. No retry attempts will be done. Data: " + fileServerData, e);
+ }
}
logger.trace("collectFile OK");
@@ -85,7 +91,12 @@ public class SftpClient implements FileCollectClient {
}
} catch (JSchException e) {
boolean retry = !e.getMessage().contains("Auth fail");
- throw new DatafileTaskException("Could not open Sftp client. " + e, e, retry);
+ if (retry) {
+ throw new DatafileTaskException("Could not open Sftp client. " + e);
+ } else {
+ throw new NonRetryableDatafileTaskException(
+ "Could not open Sftp client, no retry attempts will be done " + e);
+ }
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java
new file mode 100644
index 00000000..5efbe37a
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java
@@ -0,0 +1,108 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.model;
+
+import java.time.Instant;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ *
+ * Various counters that can be shown via a REST API.
+ *
+ */
+public class Counters {
+
+ private final AtomicInteger numberOfTasks = new AtomicInteger();
+ private final AtomicInteger numberOfSubscriptions = new AtomicInteger();
+ private int noOfCollectedFiles = 0;
+ private int noOfFailedFtpAttempts = 0;
+ private int noOfFailedFtp = 0;
+ private int noOfFailedPublishAttempts = 0;
+ private int totalPublishedFiles = 0;
+ private int noOfFailedPublish = 0;
+ private Instant lastPublishedTime = Instant.MIN;
+ private int totalReceivedEvents = 0;
+ private Instant lastEventTime = Instant.MIN;
+
+ public AtomicInteger getCurrentNumberOfTasks() {
+ return numberOfTasks;
+ }
+
+ public AtomicInteger getCurrentNumberOfSubscriptions() {
+ return numberOfSubscriptions;
+ }
+
+ public synchronized void incNoOfReceivedEvents() {
+ totalReceivedEvents++;
+ lastEventTime = Instant.now();
+ }
+
+ public synchronized void incNoOfCollectedFiles() {
+ noOfCollectedFiles++;
+ }
+
+ public synchronized void incNoOfFailedFtpAttempts() {
+ noOfFailedFtpAttempts++;
+ }
+
+ public synchronized void incNoOfFailedFtp() {
+ noOfFailedFtp++;
+ }
+
+ public synchronized void incNoOfFailedPublishAttempts() {
+ noOfFailedPublishAttempts++;
+ }
+
+ public synchronized void incTotalPublishedFiles() {
+ totalPublishedFiles++;
+ lastPublishedTime = Instant.now();
+ }
+
+ public synchronized void incNoOfFailedPublish() {
+ noOfFailedPublish++;
+ }
+
+ public synchronized String toString() {
+ StringBuilder str = new StringBuilder();
+ str.append(format("totalReceivedEvents", totalReceivedEvents));
+ str.append(format("lastEventTime", lastEventTime));
+ str.append(format("numberOfTasks", numberOfTasks));
+ str.append(format("numberOfSubscriptions", numberOfSubscriptions));
+ str.append("\n");
+ str.append(format("collectedFiles", noOfCollectedFiles));
+ str.append(format("failedFtpAttempts", noOfFailedFtpAttempts));
+ str.append(format("failedFtp", noOfFailedFtp));
+ str.append("\n");
+ str.append(format("totalPublishedFiles", totalPublishedFiles));
+ str.append(format("lastPublishedTime", lastPublishedTime));
+
+ str.append(format("failedPublishAttempts", noOfFailedPublishAttempts));
+ str.append(format("noOfFailedPublish", noOfFailedPublish));
+
+ return str.toString();
+ }
+
+ private String format(String name, Object value) {
+ String header = name + ":";
+ return String.format("%-24s%-22s\n", header, value);
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index d9efe802..02e153cf 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -22,19 +22,20 @@ package org.onap.dcaegen2.collectors.datafile.tasks;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
+import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
-import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPut;
-import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.FileEntity;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.model.Counters;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.JsonSerializer;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
@@ -62,9 +63,11 @@ public class DataRouterPublisher {
private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class);
private final AppConfig datafileAppConfig;
+ private final Counters counters;
- public DataRouterPublisher(AppConfig datafileAppConfig) {
+ public DataRouterPublisher(AppConfig datafileAppConfig, Counters counters) {
this.datafileAppConfig = datafileAppConfig;
+ this.counters = counters;
}
/**
@@ -98,8 +101,10 @@ public class DataRouterPublisher {
HttpResponse response =
dmaapProducerHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext());
logger.trace("{}", response);
+ counters.incTotalPublishedFiles();
return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
} catch (Exception e) {
+ counters.incNoOfFailedPublishAttempts();
logger.warn("Publishing file {} to DR unsuccessful.", publishInfo.getName(), e);
return Mono.error(e);
}
@@ -121,10 +126,9 @@ public class DataRouterPublisher {
}
private void prepareBody(FilePublishInformation publishInfo, HttpPut put) throws IOException {
- Path fileLocation = publishInfo.getInternalLocation();
- try (InputStream fileInputStream = createInputStream(fileLocation)) {
- put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
- }
+ File file = createInputFile(publishInfo.getInternalLocation());
+ FileEntity entity = new FileEntity(file, ContentType.DEFAULT_BINARY);
+ put.setEntity(entity);
}
private static Mono<FilePublishInformation> handleHttpResponse(HttpStatus response,
@@ -140,9 +144,9 @@ public class DataRouterPublisher {
}
}
- InputStream createInputStream(Path filePath) throws IOException {
+ File createInputFile(Path filePath) throws IOException {
FileSystemResource realResource = new FileSystemResource(filePath);
- return realResource.getInputStream();
+ return realResource.getFile();
}
PublisherConfiguration resolveConfiguration(String changeIdentifer) throws DatafileTaskException {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index aeacaffc..311f752d 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -24,9 +24,11 @@ import java.util.Optional;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient;
import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
+import org.onap.dcaegen2.collectors.datafile.model.Counters;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
@@ -45,14 +47,16 @@ public class FileCollector {
private static final Logger logger = LoggerFactory.getLogger(FileCollector.class);
private final AppConfig datafileAppConfig;
+ private final Counters counters;
/**
* Constructor.
*
* @param datafileAppConfig application configuration
*/
- public FileCollector(AppConfig datafileAppConfig) {
+ public FileCollector(AppConfig datafileAppConfig, Counters counters) {
this.datafileAppConfig = datafileAppConfig;
+ this.counters = counters;
}
/**
@@ -97,14 +101,16 @@ public class FileCollector {
currentClient.open();
localFile.getParent().toFile().mkdir(); // Create parent directories
currentClient.collectFile(remoteFile, localFile);
+ counters.incNoOfCollectedFiles();
return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context)));
} catch (DatafileTaskException e) {
logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
e.toString());
- if (e.isRetryable()) {
- return Mono.error(e);
- } else {
+ counters.incNoOfFailedFtpAttempts();
+ if (e instanceof NonRetryableDatafileTaskException) {
return Mono.just(Optional.empty()); // Give up
+ } else {
+ return Mono.error(e);
}
} catch (Exception throwable) {
logger.warn("Failed to close ftp client: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 99b2d918..300ca601 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.model.Counters;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
@@ -41,8 +42,8 @@ import reactor.core.scheduler.Schedulers;
/**
- * This implements the main flow of the data file collector. Fetch file ready events from the message router, fetch new
- * files from the PNF publish these in the data router.
+ * This implements the main flow of the data file collector. Fetch file ready events from the
+ * message router, fetch new files from the PNF publish these in the data router.
*/
@Component
public class ScheduledTasks {
@@ -57,11 +58,12 @@ public class ScheduledTasks {
private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
private final AppConfig applicationConfiguration;
- private final AtomicInteger currentNumberOfTasks = new AtomicInteger();
+ private final AtomicInteger currentNumberOfTasks;
private final AtomicInteger threadPoolQueueSize = new AtomicInteger();
- private final AtomicInteger currentNumberOfSubscriptions = new AtomicInteger();
+ private final AtomicInteger currentNumberOfSubscriptions;
private final Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS);
PublishedFileCache publishedFilesCache = new PublishedFileCache();
+ private Counters counters = new Counters();
/**
* Constructor for task registration in Datafile Workflow.
@@ -71,6 +73,8 @@ public class ScheduledTasks {
@Autowired
public ScheduledTasks(AppConfig applicationConfiguration) {
this.applicationConfiguration = applicationConfiguration;
+ this.currentNumberOfTasks = counters.getCurrentNumberOfTasks();
+ this.currentNumberOfSubscriptions = counters.getCurrentNumberOfSubscriptions();
}
/**
@@ -112,6 +116,7 @@ public class ScheduledTasks {
Flux<FilePublishInformation> createMainTask(Map<String, String> context) {
return fetchMoreFileReadyMessages() //
.doOnNext(fileReadyMessage -> threadPoolQueueSize.incrementAndGet()) //
+ .doOnNext(fileReadyMessage -> counters.incNoOfReceivedEvents()) //
.parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread
.runOn(scheduler) //
.doOnNext(fileReadyMessage -> threadPoolQueueSize.decrementAndGet()) //
@@ -148,32 +153,21 @@ public class ScheduledTasks {
return new PublishedChecker(applicationConfiguration);
}
- public int getCurrentNumberOfTasks() {
- return currentNumberOfTasks.get();
+ public Counters getCounters() {
+ return this.counters;
}
- public int publishedFilesCacheSize() {
- return publishedFilesCache.size();
- }
-
- public int getCurrentNumberOfSubscriptions() {
- return currentNumberOfSubscriptions.get();
- }
-
- public int getThreadPoolQueueSize() {
- return this.threadPoolQueueSize.get();
- }
protected DMaaPMessageConsumer createConsumerTask() throws DatafileTaskException {
return new DMaaPMessageConsumer(this.applicationConfiguration);
}
protected FileCollector createFileCollector() {
- return new FileCollector(applicationConfiguration);
+ return new FileCollector(applicationConfiguration, counters);
}
protected DataRouterPublisher createDataRouterPublisher() {
- return new DataRouterPublisher(applicationConfiguration);
+ return new DataRouterPublisher(applicationConfiguration, counters);
}
private static void onComplete(Map<String, String> contextMap) {
@@ -181,6 +175,22 @@ public class ScheduledTasks {
logger.trace("Datafile tasks have been completed");
}
+ int publishedFilesCacheSize() {
+ return publishedFilesCache.size();
+ }
+
+ int getCurrentNumberOfTasks() {
+ return this.currentNumberOfTasks.get();
+ }
+
+ int getCurrentNumberOfSubscriptions() {
+ return this.currentNumberOfSubscriptions.get();
+ }
+
+ int getThreadPoolQueueSize() {
+ return this.threadPoolQueueSize.get();
+ }
+
private static synchronized void onSuccess(FilePublishInformation publishInfo) {
MDC.setContextMap(publishInfo.getContext());
logger.info("Datafile file published {}", publishInfo.getInternalLocation());
@@ -239,6 +249,7 @@ public class ScheduledTasks {
deleteFile(localFilePath, fileData.context);
publishedFilesCache.remove(localFilePath);
currentNumberOfTasks.decrementAndGet();
+ counters.incNoOfFailedFtp();
return Mono.empty();
}
@@ -257,6 +268,7 @@ public class ScheduledTasks {
deleteFile(internalFileName, publishInfo.getContext());
publishedFilesCache.remove(internalFileName);
currentNumberOfTasks.decrementAndGet();
+ counters.incNoOfFailedPublish();
return Mono.empty();
}