From dafd553cf1694585b35fd7132c6bafdef1e98ed6 Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Fri, 14 Jun 2019 06:38:21 +0000 Subject: Bugfix, improved behaviour for large files Previously files was read into a buffer for publishing. This does not work when files are bigger than the available memory. After the fix , files are streamed instead. Implemented a new REST primitive for exposing status and statistics. To be used for test and trouble shooting. Change-Id: Iab5a1ee9ffcbf6836fcf709d115bf25ab0391732 Issue-ID: DCAEGEN2-1532 Signed-off-by: PatrikBuhr --- .../datafile/controllers/HeartbeatController.java | 83 ---------------- .../datafile/controllers/StatusController.java | 105 ++++++++++++++++++++ .../datafile/exceptions/DatafileTaskException.java | 16 --- .../NonRetryableDatafileTaskException.java | 34 +++++++ .../collectors/datafile/ftp/FtpsClient.java | 4 +- .../collectors/datafile/ftp/SftpClient.java | 15 ++- .../collectors/datafile/model/Counters.java | 108 +++++++++++++++++++++ .../datafile/tasks/DataRouterPublisher.java | 24 +++-- .../collectors/datafile/tasks/FileCollector.java | 14 ++- .../collectors/datafile/tasks/ScheduledTasks.java | 50 ++++++---- .../controller/HeartbeatControllerTest.java | 76 --------------- .../datafile/controller/StatusControllerTest.java | 95 ++++++++++++++++++ .../collectors/datafile/ftp/FtpsClientTest.java | 3 +- .../collectors/datafile/ftp/SftpClientTest.java | 8 +- .../datafile/tasks/DataRouterPublisherTest.java | 12 +-- .../datafile/tasks/FileCollectorTest.java | 18 ++-- 16 files changed, 434 insertions(+), 231 deletions(-) delete mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java create mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusController.java create mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/NonRetryableDatafileTaskException.java create mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java delete mode 100644 datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controller/HeartbeatControllerTest.java create mode 100644 datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controller/StatusControllerTest.java diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java deleted file mode 100644 index c6d56c42..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java +++ /dev/null @@ -1,83 +0,0 @@ -/*- - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.controllers; - -import static org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext.ENTRY; -import static org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext.EXIT; - -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiOperation; -import io.swagger.annotations.ApiResponse; -import io.swagger.annotations.ApiResponses; -import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; -import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestHeader; -import org.springframework.web.bind.annotation.RestController; -import reactor.core.publisher.Mono; - -/** - * Controller to check the heartbeat of DFC. - * - * @author Przemysław Wąsala on 4/19/18 - * @author Henrik Andersson - */ -@RestController -@Api(value = "HeartbeatController") -public class HeartbeatController { - - private static final Logger logger = LoggerFactory.getLogger(HeartbeatController.class); - - private final ScheduledTasks scheduledTasks; - - @Autowired - public HeartbeatController(ScheduledTasks scheduledTasks) { - this.scheduledTasks = scheduledTasks; - } - - /** - * Checks the heartbeat of DFC. - * - * @return the heartbeat status of DFC. - */ - @GetMapping("/heartbeat") - @ApiOperation(value = "Returns liveness of DATAFILE service") - @ApiResponses(value = { // - @ApiResponse(code = 200, message = "DATAFILE service is living"), - @ApiResponse(code = 401, message = "You are not authorized to view the resource"), - @ApiResponse(code = 403, message = "Accessing the resource you were trying to reach is forbidden"), - @ApiResponse(code = 404, message = "The resource you were trying to reach is not found") }) - public Mono> heartbeat(@RequestHeader HttpHeaders headers) { - MappedDiagnosticContext.initializeTraceContext(headers); - logger.info(ENTRY, "Heartbeat request"); - - StringBuilder statusString = new StringBuilder("I'm living! Status: "); - statusString.append("numberOfFileCollectionTasks=").append(scheduledTasks.getCurrentNumberOfTasks()) - .append(","); - statusString.append("fileCacheSize=").append(scheduledTasks.publishedFilesCacheSize()); - - Mono> response = Mono.just(new ResponseEntity<>(statusString.toString(), HttpStatus.OK)); - logger.info(EXIT, "Heartbeat request"); - return response; - } -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusController.java new file mode 100644 index 00000000..40f9d99b --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusController.java @@ -0,0 +1,105 @@ +/*- + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.controllers; + +import static org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext.ENTRY; +import static org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext.EXIT; + +import org.onap.dcaegen2.collectors.datafile.model.Counters; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; +import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestHeader; +import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Mono; + +/** + * REST Controller to check the heart beat and status of the DFC. + */ +@RestController +@Api(value = "StatusController") +public class StatusController { + + private static final Logger logger = LoggerFactory.getLogger(StatusController.class); + + private final ScheduledTasks scheduledTasks; + + @Autowired + public StatusController(ScheduledTasks scheduledTasks) { + this.scheduledTasks = scheduledTasks; + } + + /** + * Checks the heart beat of DFC. + * + * @return the heart beat status of DFC. + */ + @GetMapping("/heartbeat") + @ApiOperation(value = "Returns liveness of DATAFILE service") + @ApiResponses(value = { // + @ApiResponse(code = 200, message = "DATAFILE service is living"), + @ApiResponse(code = 401, message = "You are not authorized to view the resource"), + @ApiResponse(code = 403, message = "Accessing the resource you were trying to reach is forbidden"), + @ApiResponse(code = 404, message = "The resource you were trying to reach is not found")}) + public Mono> heartbeat(@RequestHeader HttpHeaders headers) { + MappedDiagnosticContext.initializeTraceContext(headers); + logger.info(ENTRY, "Heartbeat request"); + + String statusString = "I'm living!"; + + Mono> response = Mono.just(new ResponseEntity<>(statusString, HttpStatus.OK)); + logger.info(EXIT, "Heartbeat request"); + return response; + } + + /** + * Returns diagnostics and statistics information. It is intended for testing and trouble + * shooting. + * + * @return information. + */ + @GetMapping("/status") + @ApiOperation(value = "Returns status and statistics of DATAFILE service") + @ApiResponses(value = { // + @ApiResponse(code = 200, message = "DATAFILE service is living"), + @ApiResponse(code = 401, message = "You are not authorized to view the resource"), + @ApiResponse(code = 403, message = "Accessing the resource you were trying to reach is forbidden"), + @ApiResponse(code = 404, message = "The resource you were trying to reach is not found")}) + public Mono> status(@RequestHeader HttpHeaders headers) { + MappedDiagnosticContext.initializeTraceContext(headers); + logger.info(ENTRY, "Status request"); + + Counters counters = scheduledTasks.getCounters(); + Mono> response = Mono.just(new ResponseEntity<>(counters.toString(), HttpStatus.OK)); + logger.info(EXIT, "Status request"); + return response; + } + + + +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java index 38f74ed1..6aa76157 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java @@ -21,29 +21,13 @@ package org.onap.dcaegen2.collectors.datafile.exceptions; public class DatafileTaskException extends Exception { private static final long serialVersionUID = 1L; - private final boolean isRetryable; public DatafileTaskException(String message) { super(message); - isRetryable = true; - } - - public DatafileTaskException(String message, boolean retry) { - super(message); - isRetryable = retry; } public DatafileTaskException(String message, Exception originalException) { super(message, originalException); - isRetryable = true; } - public DatafileTaskException(String message, Exception originalException, boolean retry) { - super(message, originalException); - isRetryable = retry; - } - - public boolean isRetryable() { - return this.isRetryable; - } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/NonRetryableDatafileTaskException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/NonRetryableDatafileTaskException.java new file mode 100644 index 00000000..a4bdd667 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/NonRetryableDatafileTaskException.java @@ -0,0 +1,34 @@ +/*- + * ============LICENSE_START====================================================================== + * Copyright (C) 2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.exceptions; + + +public class NonRetryableDatafileTaskException extends DatafileTaskException { + + private static final long serialVersionUID = 1L; + + public NonRetryableDatafileTaskException(String message) { + super(message); + } + + public NonRetryableDatafileTaskException(String message, Exception originalException) { + super(message, originalException); + } + +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java index bb3016ce..18288603 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java @@ -38,6 +38,7 @@ import org.apache.commons.net.ftp.FTPReply; import org.apache.commons.net.ftp.FTPSClient; import org.apache.commons.net.util.KeyManagerUtils; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.io.FileSystemResource; @@ -119,8 +120,7 @@ public class FtpsClient implements FileCollectClient { try (OutputStream output = createOutputStream(localFileName)) { logger.trace("begin to retrieve from xNF."); if (!realFtpsClient.retrieveFile(remoteFileName, output)) { - final boolean retry = false; // Skip retrying for all problems except IOException - throw new DatafileTaskException("Could not retrieve file " + remoteFileName, retry); + throw new NonRetryableDatafileTaskException("Could not retrieve file. No retry attempts will be done, file :" + remoteFileName); } } catch (IOException e) { throw new DatafileTaskException("Could not fetch file: " + e, e); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java index ec523354..f5d10d34 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java @@ -25,6 +25,7 @@ import com.jcraft.jsch.SftpException; import java.nio.file.Path; import java.util.Optional; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,7 +58,12 @@ public class SftpClient implements FileCollectClient { } catch (SftpException e) { boolean retry = e.id != ChannelSftp.SSH_FX_NO_SUCH_FILE && e.id != ChannelSftp.SSH_FX_PERMISSION_DENIED && e.id != ChannelSftp.SSH_FX_OP_UNSUPPORTED; - throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e, retry); + if (retry) { + throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e); + } else { + throw new NonRetryableDatafileTaskException( + "Unable to get file from xNF. No retry attempts will be done. Data: " + fileServerData, e); + } } logger.trace("collectFile OK"); @@ -85,7 +91,12 @@ public class SftpClient implements FileCollectClient { } } catch (JSchException e) { boolean retry = !e.getMessage().contains("Auth fail"); - throw new DatafileTaskException("Could not open Sftp client. " + e, e, retry); + if (retry) { + throw new DatafileTaskException("Could not open Sftp client. " + e); + } else { + throw new NonRetryableDatafileTaskException( + "Could not open Sftp client, no retry attempts will be done " + e); + } } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java new file mode 100644 index 00000000..5efbe37a --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java @@ -0,0 +1,108 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.model; + +import java.time.Instant; +import java.util.concurrent.atomic.AtomicInteger; + + +/** + * + * Various counters that can be shown via a REST API. + * + */ +public class Counters { + + private final AtomicInteger numberOfTasks = new AtomicInteger(); + private final AtomicInteger numberOfSubscriptions = new AtomicInteger(); + private int noOfCollectedFiles = 0; + private int noOfFailedFtpAttempts = 0; + private int noOfFailedFtp = 0; + private int noOfFailedPublishAttempts = 0; + private int totalPublishedFiles = 0; + private int noOfFailedPublish = 0; + private Instant lastPublishedTime = Instant.MIN; + private int totalReceivedEvents = 0; + private Instant lastEventTime = Instant.MIN; + + public AtomicInteger getCurrentNumberOfTasks() { + return numberOfTasks; + } + + public AtomicInteger getCurrentNumberOfSubscriptions() { + return numberOfSubscriptions; + } + + public synchronized void incNoOfReceivedEvents() { + totalReceivedEvents++; + lastEventTime = Instant.now(); + } + + public synchronized void incNoOfCollectedFiles() { + noOfCollectedFiles++; + } + + public synchronized void incNoOfFailedFtpAttempts() { + noOfFailedFtpAttempts++; + } + + public synchronized void incNoOfFailedFtp() { + noOfFailedFtp++; + } + + public synchronized void incNoOfFailedPublishAttempts() { + noOfFailedPublishAttempts++; + } + + public synchronized void incTotalPublishedFiles() { + totalPublishedFiles++; + lastPublishedTime = Instant.now(); + } + + public synchronized void incNoOfFailedPublish() { + noOfFailedPublish++; + } + + public synchronized String toString() { + StringBuilder str = new StringBuilder(); + str.append(format("totalReceivedEvents", totalReceivedEvents)); + str.append(format("lastEventTime", lastEventTime)); + str.append(format("numberOfTasks", numberOfTasks)); + str.append(format("numberOfSubscriptions", numberOfSubscriptions)); + str.append("\n"); + str.append(format("collectedFiles", noOfCollectedFiles)); + str.append(format("failedFtpAttempts", noOfFailedFtpAttempts)); + str.append(format("failedFtp", noOfFailedFtp)); + str.append("\n"); + str.append(format("totalPublishedFiles", totalPublishedFiles)); + str.append(format("lastPublishedTime", lastPublishedTime)); + + str.append(format("failedPublishAttempts", noOfFailedPublishAttempts)); + str.append(format("noOfFailedPublish", noOfFailedPublish)); + + return str.toString(); + } + + private String format(String name, Object value) { + String header = name + ":"; + return String.format("%-24s%-22s\n", header, value); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java index d9efe802..02e153cf 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -22,19 +22,20 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import com.google.gson.JsonElement; import com.google.gson.JsonParser; +import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.net.MalformedURLException; import java.net.URI; import java.nio.file.Path; import java.time.Duration; -import org.apache.commons.io.IOUtils; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpPut; -import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.FileEntity; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.model.Counters; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.JsonSerializer; import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; @@ -62,9 +63,11 @@ public class DataRouterPublisher { private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class); private final AppConfig datafileAppConfig; + private final Counters counters; - public DataRouterPublisher(AppConfig datafileAppConfig) { + public DataRouterPublisher(AppConfig datafileAppConfig, Counters counters) { this.datafileAppConfig = datafileAppConfig; + this.counters = counters; } /** @@ -98,8 +101,10 @@ public class DataRouterPublisher { HttpResponse response = dmaapProducerHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext()); logger.trace("{}", response); + counters.incTotalPublishedFiles(); return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode())); } catch (Exception e) { + counters.incNoOfFailedPublishAttempts(); logger.warn("Publishing file {} to DR unsuccessful.", publishInfo.getName(), e); return Mono.error(e); } @@ -121,10 +126,9 @@ public class DataRouterPublisher { } private void prepareBody(FilePublishInformation publishInfo, HttpPut put) throws IOException { - Path fileLocation = publishInfo.getInternalLocation(); - try (InputStream fileInputStream = createInputStream(fileLocation)) { - put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream))); - } + File file = createInputFile(publishInfo.getInternalLocation()); + FileEntity entity = new FileEntity(file, ContentType.DEFAULT_BINARY); + put.setEntity(entity); } private static Mono handleHttpResponse(HttpStatus response, @@ -140,9 +144,9 @@ public class DataRouterPublisher { } } - InputStream createInputStream(Path filePath) throws IOException { + File createInputFile(Path filePath) throws IOException { FileSystemResource realResource = new FileSystemResource(filePath); - return realResource.getInputStream(); + return realResource.getFile(); } PublisherConfiguration resolveConfiguration(String changeIdentifer) throws DatafileTaskException { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java index aeacaffc..311f752d 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -24,9 +24,11 @@ import java.util.Optional; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException; import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient; import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient; import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; +import org.onap.dcaegen2.collectors.datafile.model.Counters; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation; @@ -45,14 +47,16 @@ public class FileCollector { private static final Logger logger = LoggerFactory.getLogger(FileCollector.class); private final AppConfig datafileAppConfig; + private final Counters counters; /** * Constructor. * * @param datafileAppConfig application configuration */ - public FileCollector(AppConfig datafileAppConfig) { + public FileCollector(AppConfig datafileAppConfig, Counters counters) { this.datafileAppConfig = datafileAppConfig; + this.counters = counters; } /** @@ -97,14 +101,16 @@ public class FileCollector { currentClient.open(); localFile.getParent().toFile().mkdir(); // Create parent directories currentClient.collectFile(remoteFile, localFile); + counters.incNoOfCollectedFiles(); return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context))); } catch (DatafileTaskException e) { logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(), e.toString()); - if (e.isRetryable()) { - return Mono.error(e); - } else { + counters.incNoOfFailedFtpAttempts(); + if (e instanceof NonRetryableDatafileTaskException) { return Mono.just(Optional.empty()); // Give up + } else { + return Mono.error(e); } } catch (Exception throwable) { logger.warn("Failed to close ftp client: {} {}, reason: {}", fileData.sourceName(), fileData.name(), diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 99b2d918..300ca601 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.model.Counters; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; @@ -41,8 +42,8 @@ import reactor.core.scheduler.Schedulers; /** - * This implements the main flow of the data file collector. Fetch file ready events from the message router, fetch new - * files from the PNF publish these in the data router. + * This implements the main flow of the data file collector. Fetch file ready events from the + * message router, fetch new files from the PNF publish these in the data router. */ @Component public class ScheduledTasks { @@ -57,11 +58,12 @@ public class ScheduledTasks { private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); private final AppConfig applicationConfiguration; - private final AtomicInteger currentNumberOfTasks = new AtomicInteger(); + private final AtomicInteger currentNumberOfTasks; private final AtomicInteger threadPoolQueueSize = new AtomicInteger(); - private final AtomicInteger currentNumberOfSubscriptions = new AtomicInteger(); + private final AtomicInteger currentNumberOfSubscriptions; private final Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS); PublishedFileCache publishedFilesCache = new PublishedFileCache(); + private Counters counters = new Counters(); /** * Constructor for task registration in Datafile Workflow. @@ -71,6 +73,8 @@ public class ScheduledTasks { @Autowired public ScheduledTasks(AppConfig applicationConfiguration) { this.applicationConfiguration = applicationConfiguration; + this.currentNumberOfTasks = counters.getCurrentNumberOfTasks(); + this.currentNumberOfSubscriptions = counters.getCurrentNumberOfSubscriptions(); } /** @@ -112,6 +116,7 @@ public class ScheduledTasks { Flux createMainTask(Map context) { return fetchMoreFileReadyMessages() // .doOnNext(fileReadyMessage -> threadPoolQueueSize.incrementAndGet()) // + .doOnNext(fileReadyMessage -> counters.incNoOfReceivedEvents()) // .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread .runOn(scheduler) // .doOnNext(fileReadyMessage -> threadPoolQueueSize.decrementAndGet()) // @@ -148,32 +153,21 @@ public class ScheduledTasks { return new PublishedChecker(applicationConfiguration); } - public int getCurrentNumberOfTasks() { - return currentNumberOfTasks.get(); + public Counters getCounters() { + return this.counters; } - public int publishedFilesCacheSize() { - return publishedFilesCache.size(); - } - - public int getCurrentNumberOfSubscriptions() { - return currentNumberOfSubscriptions.get(); - } - - public int getThreadPoolQueueSize() { - return this.threadPoolQueueSize.get(); - } protected DMaaPMessageConsumer createConsumerTask() throws DatafileTaskException { return new DMaaPMessageConsumer(this.applicationConfiguration); } protected FileCollector createFileCollector() { - return new FileCollector(applicationConfiguration); + return new FileCollector(applicationConfiguration, counters); } protected DataRouterPublisher createDataRouterPublisher() { - return new DataRouterPublisher(applicationConfiguration); + return new DataRouterPublisher(applicationConfiguration, counters); } private static void onComplete(Map contextMap) { @@ -181,6 +175,22 @@ public class ScheduledTasks { logger.trace("Datafile tasks have been completed"); } + int publishedFilesCacheSize() { + return publishedFilesCache.size(); + } + + int getCurrentNumberOfTasks() { + return this.currentNumberOfTasks.get(); + } + + int getCurrentNumberOfSubscriptions() { + return this.currentNumberOfSubscriptions.get(); + } + + int getThreadPoolQueueSize() { + return this.threadPoolQueueSize.get(); + } + private static synchronized void onSuccess(FilePublishInformation publishInfo) { MDC.setContextMap(publishInfo.getContext()); logger.info("Datafile file published {}", publishInfo.getInternalLocation()); @@ -239,6 +249,7 @@ public class ScheduledTasks { deleteFile(localFilePath, fileData.context); publishedFilesCache.remove(localFilePath); currentNumberOfTasks.decrementAndGet(); + counters.incNoOfFailedFtp(); return Mono.empty(); } @@ -257,6 +268,7 @@ public class ScheduledTasks { deleteFile(internalFileName, publishInfo.getContext()); publishedFilesCache.remove(internalFileName); currentNumberOfTasks.decrementAndGet(); + counters.incNoOfFailedPublish(); return Mono.empty(); } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controller/HeartbeatControllerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controller/HeartbeatControllerTest.java deleted file mode 100644 index 012a6b3d..00000000 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controller/HeartbeatControllerTest.java +++ /dev/null @@ -1,76 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * Copyright (C) 2019 Nordix Foundation. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.dcaegen2.collectors.datafile.controller; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import ch.qos.logback.classic.spi.ILoggingEvent; -import ch.qos.logback.core.read.ListAppender; -import org.apache.commons.lang3.StringUtils; -import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.collectors.datafile.controllers.HeartbeatController; -import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks; -import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils; -import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables; -import org.slf4j.MDC; -import org.springframework.http.HttpHeaders; -import org.springframework.http.ResponseEntity; -import reactor.core.publisher.Mono; - -public class HeartbeatControllerTest { - @Test - public void heartbeat_success() { - ScheduledTasks scheduledTasksMock = mock(ScheduledTasks.class); - when(scheduledTasksMock.getCurrentNumberOfTasks()).thenReturn(10); - when(scheduledTasksMock.publishedFilesCacheSize()).thenReturn(20); - - HttpHeaders httpHeaders = new HttpHeaders(); - - HeartbeatController controllerUnderTest = new HeartbeatController(scheduledTasksMock); - - ListAppender logAppender = LoggingUtils.getLogListAppender(HeartbeatController.class); - Mono> result = controllerUnderTest.heartbeat(httpHeaders); - - validateLogging(logAppender); - - String body = result.block().getBody(); - assertTrue(body.startsWith("I'm living! Status: ")); - assertTrue(body.contains("numberOfFileCollectionTasks=10")); - assertTrue(body.contains("fileCacheSize=20")); - - assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.REQUEST_ID))); - assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.INVOCATION_ID))); - } - - private void validateLogging(ListAppender logAppender) { - assertEquals(logAppender.list.get(0).getMarker().getName(), "ENTRY"); - assertNotNull(logAppender.list.get(0).getMDCPropertyMap().get("InvocationID")); - assertNotNull(logAppender.list.get(0).getMDCPropertyMap().get("RequestID")); - assertTrue("Info missing in log", logAppender.list.toString().contains("[INFO] Heartbeat request")); - assertEquals(logAppender.list.get(1).getMarker().getName(), "EXIT"); - logAppender.stop(); - } -} diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controller/StatusControllerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controller/StatusControllerTest.java new file mode 100644 index 00000000..51097f52 --- /dev/null +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controller/StatusControllerTest.java @@ -0,0 +1,95 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.controller; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.read.ListAppender; +import org.apache.commons.lang3.StringUtils; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.collectors.datafile.controllers.StatusController; +import org.onap.dcaegen2.collectors.datafile.model.Counters; +import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks; +import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables; +import org.slf4j.MDC; +import org.springframework.http.HttpHeaders; +import org.springframework.http.ResponseEntity; +import reactor.core.publisher.Mono; + +public class StatusControllerTest { + @Test + public void heartbeat_success() { + ScheduledTasks scheduledTasksMock = mock(ScheduledTasks.class); + + HttpHeaders httpHeaders = new HttpHeaders(); + + StatusController controllerUnderTest = new StatusController(scheduledTasksMock); + + ListAppender logAppender = LoggingUtils.getLogListAppender(StatusController.class); + Mono> result = controllerUnderTest.heartbeat(httpHeaders); + + validateLogging(logAppender); + + String body = result.block().getBody(); + assertTrue(body.startsWith("I'm living!")); + + assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.REQUEST_ID))); + assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.INVOCATION_ID))); + } + + + @Test + public void status() { + ScheduledTasks scheduledTasksMock = mock(ScheduledTasks.class); + Counters counters = new Counters(); + doReturn(counters).when(scheduledTasksMock).getCounters(); + + HttpHeaders httpHeaders = new HttpHeaders(); + + StatusController controllerUnderTest = new StatusController(scheduledTasksMock); + + Mono> result = controllerUnderTest.status(httpHeaders); + + String body = result.block().getBody(); + System.out.println(body); + + assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.REQUEST_ID))); + assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.INVOCATION_ID))); + } + + + private void validateLogging(ListAppender logAppender) { + assertEquals(logAppender.list.get(0).getMarker().getName(), "ENTRY"); + assertNotNull(logAppender.list.get(0).getMDCPropertyMap().get("InvocationID")); + assertNotNull(logAppender.list.get(0).getMDCPropertyMap().get("RequestID")); + assertTrue("Info missing in log", logAppender.list.toString().contains("[INFO] Heartbeat request")); + assertEquals(logAppender.list.get(1).getMarker().getName(), "EXIT"); + logAppender.stop(); + } +} diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java index e0182560..f4e814f4 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java @@ -205,7 +205,8 @@ public class FtpsClientTest { doReturn(false).when(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, outputStreamMock); assertThatThrownBy(() -> clientUnderTestSpy.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH)) - .hasMessage("Could not retrieve file /dir/sample.txt"); + .hasMessageContaining(REMOTE_FILE_PATH) + .hasMessageContaining("No retry"); verifyFtpsClientMock_openOk(); verify(ftpsClientMock, times(1)).retrieveFile(ArgumentMatchers.eq(REMOTE_FILE_PATH), any()); diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java index cb3735be..693806c2 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java @@ -121,8 +121,7 @@ public class SftpClientTest { doReturn(jschMock).when(sftpClientSpy).createJsch(); when(jschMock.getSession(anyString(), anyString(), anyInt())).thenThrow(new JSchException("Failed")); - assertThatThrownBy(() -> sftpClientSpy.open()) - .hasMessageStartingWith("Could not open Sftp client. com.jcraft.jsch.JSchException: Failed"); + assertThatThrownBy(() -> sftpClientSpy.open()).hasMessageStartingWith("Could not open Sftp client."); } @SuppressWarnings("resource") @@ -161,8 +160,9 @@ public class SftpClientTest { assertThatThrownBy(() -> sftpClient.collectFile("remoteFile", Paths.get("localFile"))) .isInstanceOf(DatafileTaskException.class) - .hasMessageStartingWith("Unable to get file from xNF. Data: FileServerData{serverAddress=" + HOST - + ", " + "userId=" + USERNAME + ", password=####, port=" + SFTP_PORT); + .hasMessageStartingWith("Unable to get file from xNF. No retry attempts will be done") + .hasMessageContaining("Data: FileServerData{serverAddress=" + HOST + ", " + "userId=" + USERNAME + + ", password=####, port=" + SFTP_PORT); } } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java index 4da22cbf..6a9dccda 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java @@ -27,10 +27,10 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import java.io.File; + import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.read.ListAppender; -import java.io.ByteArrayInputStream; -import java.io.InputStream; import java.net.URI; import java.nio.file.Path; import java.nio.file.Paths; @@ -50,6 +50,7 @@ import org.mockito.ArgumentCaptor; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.model.Counters; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation; import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient; @@ -86,7 +87,6 @@ class DataRouterPublisherTest { private static final String APPLICATION_OCTET_STREAM_CONTENT_TYPE = "application/octet-stream"; private static final String PUBLISH_TOPIC = "publish"; private static final String FEED_ID = "1"; - private static final String FILE_CONTENT = "Just a string."; private static FilePublishInformation filePublishInformation; private static DmaapProducerHttpClient httpClientMock; @@ -120,7 +120,7 @@ class DataRouterPublisherTest { .changeIdentifier(CHANGE_IDENTIFIER) // .build(); // appConfig = mock(AppConfig.class); - publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig)); + publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig, new Counters())); } @Test @@ -236,8 +236,8 @@ class DataRouterPublisherTest { when(httpResponseMock.getStatusLine()).thenReturn(statusLineMock); when(statusLineMock.getStatusCode()).thenReturn(firstResponse, nextHttpResponses); - InputStream fileStream = new ByteArrayInputStream(FILE_CONTENT.getBytes()); - doReturn(fileStream).when(publisherTaskUnderTestSpy).createInputStream(Paths.get("target", PM_FILE_NAME)); + File file = File.createTempFile("DFC", "tmp"); + doReturn(file).when(publisherTaskUnderTestSpy).createInputFile(Paths.get("target", PM_FILE_NAME)); } private Map getMetaDataAsMap(Header[] metaHeaders) { diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java index 299a0238..99e92bd2 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java @@ -37,9 +37,11 @@ import org.junit.jupiter.api.Test; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException; import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient; import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; +import org.onap.dcaegen2.collectors.datafile.model.Counters; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; @@ -93,6 +95,7 @@ public class FileCollectorTest { private SftpClient sftpClientMock = mock(SftpClient.class); private final Map contextMap = new HashMap<>(); + private final Counters counters = new Counters(); private MessageMetaData createMessageMetaData() { return ImmutableMessageMetaData.builder() // @@ -133,7 +136,7 @@ public class FileCollectorTest { .compression(GZIP_COMPRESSION) // .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) // .fileFormatVersion(FILE_FORMAT_VERSION) // - .context(new HashMap()) // + .context(new HashMap()) // .changeIdentifier(CHANGE_IDENTIFIER) // .build(); } @@ -152,7 +155,7 @@ public class FileCollectorTest { @Test public void whenFtpesFile_returnCorrectResponse() throws Exception { - FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock)); + FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, counters)); doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any()); FileData fileData = createFileData(FTPES_LOCATION_NO_PORT, Scheme.FTPS); @@ -173,7 +176,7 @@ public class FileCollectorTest { @Test public void whenSftpFile_returnCorrectResponse() throws Exception { - FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock)); + FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, counters)); doReturn(sftpClientMock).when(collectorUndetTest).createSftpClient(any()); @@ -201,7 +204,7 @@ public class FileCollectorTest { @Test public void whenFtpesFileAlwaysFail_retryAndFail() throws Exception { - FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock)); + FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, counters)); doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any()); FileData fileData = createFileData(FTPES_LOCATION, Scheme.FTPS); @@ -217,12 +220,11 @@ public class FileCollectorTest { @Test public void whenFtpesFileAlwaysFail_failWithoutRetry() throws Exception { - FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock)); + FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, new Counters())); doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any()); - final boolean retry = false; FileData fileData = createFileData(FTPES_LOCATION, Scheme.FTPS); - doThrow(new DatafileTaskException("Unable to collect file.", retry)).when(ftpsClientMock) + doThrow(new NonRetryableDatafileTaskException("Unable to collect file.")).when(ftpsClientMock) .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0), contextMap)) @@ -234,7 +236,7 @@ public class FileCollectorTest { @Test public void whenFtpesFileFailOnce_retryAndReturnCorrectResponse() throws Exception { - FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock)); + FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, counters)); doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any()); doThrow(new DatafileTaskException("Unable to collect file.")).doNothing().when(ftpsClientMock) .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); -- cgit 1.2.3-korg