diff options
author | PatrikBuhr <patrik.buhr@est.tech> | 2019-06-14 06:38:21 +0000 |
---|---|---|
committer | PatrikBuhr <patrik.buhr@est.tech> | 2019-06-14 06:38:21 +0000 |
commit | dafd553cf1694585b35fd7132c6bafdef1e98ed6 (patch) | |
tree | 7882694255ee951de06e9e3060af508c9e184385 /datafile-app-server | |
parent | addf3f10dd0cc1d0797151633839c65276af8b1c (diff) |
Bugfix, improved behaviour for large files
Previously files was read into a buffer for publishing.
This does not work when files are bigger than the available memory.
After the fix , files are streamed instead.
Implemented a new REST primitive for exposing status and
statistics. To be used for test and trouble shooting.
Change-Id: Iab5a1ee9ffcbf6836fcf709d115bf25ab0391732
Issue-ID: DCAEGEN2-1532
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Diffstat (limited to 'datafile-app-server')
14 files changed, 300 insertions, 97 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusController.java index c6d56c42..40f9d99b 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusController.java @@ -19,6 +19,8 @@ package org.onap.dcaegen2.collectors.datafile.controllers; import static org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext.ENTRY; import static org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext.EXIT; +import org.onap.dcaegen2.collectors.datafile.model.Counters; + import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; @@ -37,28 +39,25 @@ import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Mono; /** - * Controller to check the heartbeat of DFC. - * - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/19/18 - * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> + * REST Controller to check the heart beat and status of the DFC. */ @RestController -@Api(value = "HeartbeatController") -public class HeartbeatController { +@Api(value = "StatusController") +public class StatusController { - private static final Logger logger = LoggerFactory.getLogger(HeartbeatController.class); + private static final Logger logger = LoggerFactory.getLogger(StatusController.class); private final ScheduledTasks scheduledTasks; @Autowired - public HeartbeatController(ScheduledTasks scheduledTasks) { + public StatusController(ScheduledTasks scheduledTasks) { this.scheduledTasks = scheduledTasks; } /** - * Checks the heartbeat of DFC. + * Checks the heart beat of DFC. * - * @return the heartbeat status of DFC. + * @return the heart beat status of DFC. */ @GetMapping("/heartbeat") @ApiOperation(value = "Returns liveness of DATAFILE service") @@ -66,18 +65,41 @@ public class HeartbeatController { @ApiResponse(code = 200, message = "DATAFILE service is living"), @ApiResponse(code = 401, message = "You are not authorized to view the resource"), @ApiResponse(code = 403, message = "Accessing the resource you were trying to reach is forbidden"), - @ApiResponse(code = 404, message = "The resource you were trying to reach is not found") }) + @ApiResponse(code = 404, message = "The resource you were trying to reach is not found")}) public Mono<ResponseEntity<String>> heartbeat(@RequestHeader HttpHeaders headers) { MappedDiagnosticContext.initializeTraceContext(headers); logger.info(ENTRY, "Heartbeat request"); - StringBuilder statusString = new StringBuilder("I'm living! Status: "); - statusString.append("numberOfFileCollectionTasks=").append(scheduledTasks.getCurrentNumberOfTasks()) - .append(","); - statusString.append("fileCacheSize=").append(scheduledTasks.publishedFilesCacheSize()); + String statusString = "I'm living!"; - Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>(statusString.toString(), HttpStatus.OK)); + Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>(statusString, HttpStatus.OK)); logger.info(EXIT, "Heartbeat request"); return response; } + + /** + * Returns diagnostics and statistics information. It is intended for testing and trouble + * shooting. + * + * @return information. + */ + @GetMapping("/status") + @ApiOperation(value = "Returns status and statistics of DATAFILE service") + @ApiResponses(value = { // + @ApiResponse(code = 200, message = "DATAFILE service is living"), + @ApiResponse(code = 401, message = "You are not authorized to view the resource"), + @ApiResponse(code = 403, message = "Accessing the resource you were trying to reach is forbidden"), + @ApiResponse(code = 404, message = "The resource you were trying to reach is not found")}) + public Mono<ResponseEntity<String>> status(@RequestHeader HttpHeaders headers) { + MappedDiagnosticContext.initializeTraceContext(headers); + logger.info(ENTRY, "Status request"); + + Counters counters = scheduledTasks.getCounters(); + Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>(counters.toString(), HttpStatus.OK)); + logger.info(EXIT, "Status request"); + return response; + } + + + } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java index 38f74ed1..6aa76157 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java @@ -21,29 +21,13 @@ package org.onap.dcaegen2.collectors.datafile.exceptions; public class DatafileTaskException extends Exception { private static final long serialVersionUID = 1L; - private final boolean isRetryable; public DatafileTaskException(String message) { super(message); - isRetryable = true; - } - - public DatafileTaskException(String message, boolean retry) { - super(message); - isRetryable = retry; } public DatafileTaskException(String message, Exception originalException) { super(message, originalException); - isRetryable = true; } - public DatafileTaskException(String message, Exception originalException, boolean retry) { - super(message, originalException); - isRetryable = retry; - } - - public boolean isRetryable() { - return this.isRetryable; - } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/NonRetryableDatafileTaskException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/NonRetryableDatafileTaskException.java new file mode 100644 index 00000000..a4bdd667 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/NonRetryableDatafileTaskException.java @@ -0,0 +1,34 @@ +/*- + * ============LICENSE_START====================================================================== + * Copyright (C) 2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.exceptions; + + +public class NonRetryableDatafileTaskException extends DatafileTaskException { + + private static final long serialVersionUID = 1L; + + public NonRetryableDatafileTaskException(String message) { + super(message); + } + + public NonRetryableDatafileTaskException(String message, Exception originalException) { + super(message, originalException); + } + +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java index bb3016ce..18288603 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java @@ -38,6 +38,7 @@ import org.apache.commons.net.ftp.FTPReply; import org.apache.commons.net.ftp.FTPSClient; import org.apache.commons.net.util.KeyManagerUtils; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.io.FileSystemResource; @@ -119,8 +120,7 @@ public class FtpsClient implements FileCollectClient { try (OutputStream output = createOutputStream(localFileName)) { logger.trace("begin to retrieve from xNF."); if (!realFtpsClient.retrieveFile(remoteFileName, output)) { - final boolean retry = false; // Skip retrying for all problems except IOException - throw new DatafileTaskException("Could not retrieve file " + remoteFileName, retry); + throw new NonRetryableDatafileTaskException("Could not retrieve file. No retry attempts will be done, file :" + remoteFileName); } } catch (IOException e) { throw new DatafileTaskException("Could not fetch file: " + e, e); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java index ec523354..f5d10d34 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java @@ -25,6 +25,7 @@ import com.jcraft.jsch.SftpException; import java.nio.file.Path; import java.util.Optional; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,7 +58,12 @@ public class SftpClient implements FileCollectClient { } catch (SftpException e) { boolean retry = e.id != ChannelSftp.SSH_FX_NO_SUCH_FILE && e.id != ChannelSftp.SSH_FX_PERMISSION_DENIED && e.id != ChannelSftp.SSH_FX_OP_UNSUPPORTED; - throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e, retry); + if (retry) { + throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e); + } else { + throw new NonRetryableDatafileTaskException( + "Unable to get file from xNF. No retry attempts will be done. Data: " + fileServerData, e); + } } logger.trace("collectFile OK"); @@ -85,7 +91,12 @@ public class SftpClient implements FileCollectClient { } } catch (JSchException e) { boolean retry = !e.getMessage().contains("Auth fail"); - throw new DatafileTaskException("Could not open Sftp client. " + e, e, retry); + if (retry) { + throw new DatafileTaskException("Could not open Sftp client. " + e); + } else { + throw new NonRetryableDatafileTaskException( + "Could not open Sftp client, no retry attempts will be done " + e); + } } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java new file mode 100644 index 00000000..5efbe37a --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java @@ -0,0 +1,108 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.model; + +import java.time.Instant; +import java.util.concurrent.atomic.AtomicInteger; + + +/** + * + * Various counters that can be shown via a REST API. + * + */ +public class Counters { + + private final AtomicInteger numberOfTasks = new AtomicInteger(); + private final AtomicInteger numberOfSubscriptions = new AtomicInteger(); + private int noOfCollectedFiles = 0; + private int noOfFailedFtpAttempts = 0; + private int noOfFailedFtp = 0; + private int noOfFailedPublishAttempts = 0; + private int totalPublishedFiles = 0; + private int noOfFailedPublish = 0; + private Instant lastPublishedTime = Instant.MIN; + private int totalReceivedEvents = 0; + private Instant lastEventTime = Instant.MIN; + + public AtomicInteger getCurrentNumberOfTasks() { + return numberOfTasks; + } + + public AtomicInteger getCurrentNumberOfSubscriptions() { + return numberOfSubscriptions; + } + + public synchronized void incNoOfReceivedEvents() { + totalReceivedEvents++; + lastEventTime = Instant.now(); + } + + public synchronized void incNoOfCollectedFiles() { + noOfCollectedFiles++; + } + + public synchronized void incNoOfFailedFtpAttempts() { + noOfFailedFtpAttempts++; + } + + public synchronized void incNoOfFailedFtp() { + noOfFailedFtp++; + } + + public synchronized void incNoOfFailedPublishAttempts() { + noOfFailedPublishAttempts++; + } + + public synchronized void incTotalPublishedFiles() { + totalPublishedFiles++; + lastPublishedTime = Instant.now(); + } + + public synchronized void incNoOfFailedPublish() { + noOfFailedPublish++; + } + + public synchronized String toString() { + StringBuilder str = new StringBuilder(); + str.append(format("totalReceivedEvents", totalReceivedEvents)); + str.append(format("lastEventTime", lastEventTime)); + str.append(format("numberOfTasks", numberOfTasks)); + str.append(format("numberOfSubscriptions", numberOfSubscriptions)); + str.append("\n"); + str.append(format("collectedFiles", noOfCollectedFiles)); + str.append(format("failedFtpAttempts", noOfFailedFtpAttempts)); + str.append(format("failedFtp", noOfFailedFtp)); + str.append("\n"); + str.append(format("totalPublishedFiles", totalPublishedFiles)); + str.append(format("lastPublishedTime", lastPublishedTime)); + + str.append(format("failedPublishAttempts", noOfFailedPublishAttempts)); + str.append(format("noOfFailedPublish", noOfFailedPublish)); + + return str.toString(); + } + + private String format(String name, Object value) { + String header = name + ":"; + return String.format("%-24s%-22s\n", header, value); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java index d9efe802..02e153cf 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -22,19 +22,20 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import com.google.gson.JsonElement; import com.google.gson.JsonParser; +import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.net.MalformedURLException; import java.net.URI; import java.nio.file.Path; import java.time.Duration; -import org.apache.commons.io.IOUtils; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpPut; -import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.FileEntity; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.model.Counters; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.JsonSerializer; import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; @@ -62,9 +63,11 @@ public class DataRouterPublisher { private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class); private final AppConfig datafileAppConfig; + private final Counters counters; - public DataRouterPublisher(AppConfig datafileAppConfig) { + public DataRouterPublisher(AppConfig datafileAppConfig, Counters counters) { this.datafileAppConfig = datafileAppConfig; + this.counters = counters; } /** @@ -98,8 +101,10 @@ public class DataRouterPublisher { HttpResponse response = dmaapProducerHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext()); logger.trace("{}", response); + counters.incTotalPublishedFiles(); return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode())); } catch (Exception e) { + counters.incNoOfFailedPublishAttempts(); logger.warn("Publishing file {} to DR unsuccessful.", publishInfo.getName(), e); return Mono.error(e); } @@ -121,10 +126,9 @@ public class DataRouterPublisher { } private void prepareBody(FilePublishInformation publishInfo, HttpPut put) throws IOException { - Path fileLocation = publishInfo.getInternalLocation(); - try (InputStream fileInputStream = createInputStream(fileLocation)) { - put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream))); - } + File file = createInputFile(publishInfo.getInternalLocation()); + FileEntity entity = new FileEntity(file, ContentType.DEFAULT_BINARY); + put.setEntity(entity); } private static Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, @@ -140,9 +144,9 @@ public class DataRouterPublisher { } } - InputStream createInputStream(Path filePath) throws IOException { + File createInputFile(Path filePath) throws IOException { FileSystemResource realResource = new FileSystemResource(filePath); - return realResource.getInputStream(); + return realResource.getFile(); } PublisherConfiguration resolveConfiguration(String changeIdentifer) throws DatafileTaskException { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java index aeacaffc..311f752d 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -24,9 +24,11 @@ import java.util.Optional; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException; import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient; import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient; import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; +import org.onap.dcaegen2.collectors.datafile.model.Counters; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation; @@ -45,14 +47,16 @@ public class FileCollector { private static final Logger logger = LoggerFactory.getLogger(FileCollector.class); private final AppConfig datafileAppConfig; + private final Counters counters; /** * Constructor. * * @param datafileAppConfig application configuration */ - public FileCollector(AppConfig datafileAppConfig) { + public FileCollector(AppConfig datafileAppConfig, Counters counters) { this.datafileAppConfig = datafileAppConfig; + this.counters = counters; } /** @@ -97,14 +101,16 @@ public class FileCollector { currentClient.open(); localFile.getParent().toFile().mkdir(); // Create parent directories currentClient.collectFile(remoteFile, localFile); + counters.incNoOfCollectedFiles(); return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context))); } catch (DatafileTaskException e) { logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(), e.toString()); - if (e.isRetryable()) { - return Mono.error(e); - } else { + counters.incNoOfFailedFtpAttempts(); + if (e instanceof NonRetryableDatafileTaskException) { return Mono.just(Optional.empty()); // Give up + } else { + return Mono.error(e); } } catch (Exception throwable) { logger.warn("Failed to close ftp client: {} {}, reason: {}", fileData.sourceName(), fileData.name(), diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 99b2d918..300ca601 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.model.Counters; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; @@ -41,8 +42,8 @@ import reactor.core.scheduler.Schedulers; /** - * This implements the main flow of the data file collector. Fetch file ready events from the message router, fetch new - * files from the PNF publish these in the data router. + * This implements the main flow of the data file collector. Fetch file ready events from the + * message router, fetch new files from the PNF publish these in the data router. */ @Component public class ScheduledTasks { @@ -57,11 +58,12 @@ public class ScheduledTasks { private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); private final AppConfig applicationConfiguration; - private final AtomicInteger currentNumberOfTasks = new AtomicInteger(); + private final AtomicInteger currentNumberOfTasks; private final AtomicInteger threadPoolQueueSize = new AtomicInteger(); - private final AtomicInteger currentNumberOfSubscriptions = new AtomicInteger(); + private final AtomicInteger currentNumberOfSubscriptions; private final Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS); PublishedFileCache publishedFilesCache = new PublishedFileCache(); + private Counters counters = new Counters(); /** * Constructor for task registration in Datafile Workflow. @@ -71,6 +73,8 @@ public class ScheduledTasks { @Autowired public ScheduledTasks(AppConfig applicationConfiguration) { this.applicationConfiguration = applicationConfiguration; + this.currentNumberOfTasks = counters.getCurrentNumberOfTasks(); + this.currentNumberOfSubscriptions = counters.getCurrentNumberOfSubscriptions(); } /** @@ -112,6 +116,7 @@ public class ScheduledTasks { Flux<FilePublishInformation> createMainTask(Map<String, String> context) { return fetchMoreFileReadyMessages() // .doOnNext(fileReadyMessage -> threadPoolQueueSize.incrementAndGet()) // + .doOnNext(fileReadyMessage -> counters.incNoOfReceivedEvents()) // .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread .runOn(scheduler) // .doOnNext(fileReadyMessage -> threadPoolQueueSize.decrementAndGet()) // @@ -148,32 +153,21 @@ public class ScheduledTasks { return new PublishedChecker(applicationConfiguration); } - public int getCurrentNumberOfTasks() { - return currentNumberOfTasks.get(); + public Counters getCounters() { + return this.counters; } - public int publishedFilesCacheSize() { - return publishedFilesCache.size(); - } - - public int getCurrentNumberOfSubscriptions() { - return currentNumberOfSubscriptions.get(); - } - - public int getThreadPoolQueueSize() { - return this.threadPoolQueueSize.get(); - } protected DMaaPMessageConsumer createConsumerTask() throws DatafileTaskException { return new DMaaPMessageConsumer(this.applicationConfiguration); } protected FileCollector createFileCollector() { - return new FileCollector(applicationConfiguration); + return new FileCollector(applicationConfiguration, counters); } protected DataRouterPublisher createDataRouterPublisher() { - return new DataRouterPublisher(applicationConfiguration); + return new DataRouterPublisher(applicationConfiguration, counters); } private static void onComplete(Map<String, String> contextMap) { @@ -181,6 +175,22 @@ public class ScheduledTasks { logger.trace("Datafile tasks have been completed"); } + int publishedFilesCacheSize() { + return publishedFilesCache.size(); + } + + int getCurrentNumberOfTasks() { + return this.currentNumberOfTasks.get(); + } + + int getCurrentNumberOfSubscriptions() { + return this.currentNumberOfSubscriptions.get(); + } + + int getThreadPoolQueueSize() { + return this.threadPoolQueueSize.get(); + } + private static synchronized void onSuccess(FilePublishInformation publishInfo) { MDC.setContextMap(publishInfo.getContext()); logger.info("Datafile file published {}", publishInfo.getInternalLocation()); @@ -239,6 +249,7 @@ public class ScheduledTasks { deleteFile(localFilePath, fileData.context); publishedFilesCache.remove(localFilePath); currentNumberOfTasks.decrementAndGet(); + counters.incNoOfFailedFtp(); return Mono.empty(); } @@ -257,6 +268,7 @@ public class ScheduledTasks { deleteFile(internalFileName, publishInfo.getContext()); publishedFilesCache.remove(internalFileName); currentNumberOfTasks.decrementAndGet(); + counters.incNoOfFailedPublish(); return Mono.empty(); } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controller/HeartbeatControllerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controller/StatusControllerTest.java index 012a6b3d..51097f52 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controller/HeartbeatControllerTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controller/StatusControllerTest.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -31,7 +32,8 @@ import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.read.ListAppender; import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.collectors.datafile.controllers.HeartbeatController; +import org.onap.dcaegen2.collectors.datafile.controllers.StatusController; +import org.onap.dcaegen2.collectors.datafile.model.Counters; import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks; import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables; @@ -40,31 +42,48 @@ import org.springframework.http.HttpHeaders; import org.springframework.http.ResponseEntity; import reactor.core.publisher.Mono; -public class HeartbeatControllerTest { +public class StatusControllerTest { @Test public void heartbeat_success() { ScheduledTasks scheduledTasksMock = mock(ScheduledTasks.class); - when(scheduledTasksMock.getCurrentNumberOfTasks()).thenReturn(10); - when(scheduledTasksMock.publishedFilesCacheSize()).thenReturn(20); HttpHeaders httpHeaders = new HttpHeaders(); - HeartbeatController controllerUnderTest = new HeartbeatController(scheduledTasksMock); + StatusController controllerUnderTest = new StatusController(scheduledTasksMock); - ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(HeartbeatController.class); + ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(StatusController.class); Mono<ResponseEntity<String>> result = controllerUnderTest.heartbeat(httpHeaders); validateLogging(logAppender); String body = result.block().getBody(); - assertTrue(body.startsWith("I'm living! Status: ")); - assertTrue(body.contains("numberOfFileCollectionTasks=10")); - assertTrue(body.contains("fileCacheSize=20")); + assertTrue(body.startsWith("I'm living!")); assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.REQUEST_ID))); assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.INVOCATION_ID))); } + + @Test + public void status() { + ScheduledTasks scheduledTasksMock = mock(ScheduledTasks.class); + Counters counters = new Counters(); + doReturn(counters).when(scheduledTasksMock).getCounters(); + + HttpHeaders httpHeaders = new HttpHeaders(); + + StatusController controllerUnderTest = new StatusController(scheduledTasksMock); + + Mono<ResponseEntity<String>> result = controllerUnderTest.status(httpHeaders); + + String body = result.block().getBody(); + System.out.println(body); + + assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.REQUEST_ID))); + assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.INVOCATION_ID))); + } + + private void validateLogging(ListAppender<ILoggingEvent> logAppender) { assertEquals(logAppender.list.get(0).getMarker().getName(), "ENTRY"); assertNotNull(logAppender.list.get(0).getMDCPropertyMap().get("InvocationID")); diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java index e0182560..f4e814f4 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java @@ -205,7 +205,8 @@ public class FtpsClientTest { doReturn(false).when(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, outputStreamMock); assertThatThrownBy(() -> clientUnderTestSpy.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH)) - .hasMessage("Could not retrieve file /dir/sample.txt"); + .hasMessageContaining(REMOTE_FILE_PATH) + .hasMessageContaining("No retry"); verifyFtpsClientMock_openOk(); verify(ftpsClientMock, times(1)).retrieveFile(ArgumentMatchers.eq(REMOTE_FILE_PATH), any()); diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java index cb3735be..693806c2 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java @@ -121,8 +121,7 @@ public class SftpClientTest { doReturn(jschMock).when(sftpClientSpy).createJsch(); when(jschMock.getSession(anyString(), anyString(), anyInt())).thenThrow(new JSchException("Failed")); - assertThatThrownBy(() -> sftpClientSpy.open()) - .hasMessageStartingWith("Could not open Sftp client. com.jcraft.jsch.JSchException: Failed"); + assertThatThrownBy(() -> sftpClientSpy.open()).hasMessageStartingWith("Could not open Sftp client."); } @SuppressWarnings("resource") @@ -161,8 +160,9 @@ public class SftpClientTest { assertThatThrownBy(() -> sftpClient.collectFile("remoteFile", Paths.get("localFile"))) .isInstanceOf(DatafileTaskException.class) - .hasMessageStartingWith("Unable to get file from xNF. Data: FileServerData{serverAddress=" + HOST - + ", " + "userId=" + USERNAME + ", password=####, port=" + SFTP_PORT); + .hasMessageStartingWith("Unable to get file from xNF. No retry attempts will be done") + .hasMessageContaining("Data: FileServerData{serverAddress=" + HOST + ", " + "userId=" + USERNAME + + ", password=####, port=" + SFTP_PORT); } } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java index 4da22cbf..6a9dccda 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java @@ -27,10 +27,10 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import java.io.File; + import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.read.ListAppender; -import java.io.ByteArrayInputStream; -import java.io.InputStream; import java.net.URI; import java.nio.file.Path; import java.nio.file.Paths; @@ -50,6 +50,7 @@ import org.mockito.ArgumentCaptor; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.model.Counters; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation; import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient; @@ -86,7 +87,6 @@ class DataRouterPublisherTest { private static final String APPLICATION_OCTET_STREAM_CONTENT_TYPE = "application/octet-stream"; private static final String PUBLISH_TOPIC = "publish"; private static final String FEED_ID = "1"; - private static final String FILE_CONTENT = "Just a string."; private static FilePublishInformation filePublishInformation; private static DmaapProducerHttpClient httpClientMock; @@ -120,7 +120,7 @@ class DataRouterPublisherTest { .changeIdentifier(CHANGE_IDENTIFIER) // .build(); // appConfig = mock(AppConfig.class); - publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig)); + publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig, new Counters())); } @Test @@ -236,8 +236,8 @@ class DataRouterPublisherTest { when(httpResponseMock.getStatusLine()).thenReturn(statusLineMock); when(statusLineMock.getStatusCode()).thenReturn(firstResponse, nextHttpResponses); - InputStream fileStream = new ByteArrayInputStream(FILE_CONTENT.getBytes()); - doReturn(fileStream).when(publisherTaskUnderTestSpy).createInputStream(Paths.get("target", PM_FILE_NAME)); + File file = File.createTempFile("DFC", "tmp"); + doReturn(file).when(publisherTaskUnderTestSpy).createInputFile(Paths.get("target", PM_FILE_NAME)); } private Map<String, String> getMetaDataAsMap(Header[] metaHeaders) { diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java index 299a0238..99e92bd2 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java @@ -37,9 +37,11 @@ import org.junit.jupiter.api.Test; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException; import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient; import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; +import org.onap.dcaegen2.collectors.datafile.model.Counters; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; @@ -93,6 +95,7 @@ public class FileCollectorTest { private SftpClient sftpClientMock = mock(SftpClient.class); private final Map<String, String> contextMap = new HashMap<>(); + private final Counters counters = new Counters(); private MessageMetaData createMessageMetaData() { return ImmutableMessageMetaData.builder() // @@ -133,7 +136,7 @@ public class FileCollectorTest { .compression(GZIP_COMPRESSION) // .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) // .fileFormatVersion(FILE_FORMAT_VERSION) // - .context(new HashMap<String,String>()) // + .context(new HashMap<String, String>()) // .changeIdentifier(CHANGE_IDENTIFIER) // .build(); } @@ -152,7 +155,7 @@ public class FileCollectorTest { @Test public void whenFtpesFile_returnCorrectResponse() throws Exception { - FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock)); + FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, counters)); doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any()); FileData fileData = createFileData(FTPES_LOCATION_NO_PORT, Scheme.FTPS); @@ -173,7 +176,7 @@ public class FileCollectorTest { @Test public void whenSftpFile_returnCorrectResponse() throws Exception { - FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock)); + FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, counters)); doReturn(sftpClientMock).when(collectorUndetTest).createSftpClient(any()); @@ -201,7 +204,7 @@ public class FileCollectorTest { @Test public void whenFtpesFileAlwaysFail_retryAndFail() throws Exception { - FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock)); + FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, counters)); doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any()); FileData fileData = createFileData(FTPES_LOCATION, Scheme.FTPS); @@ -217,12 +220,11 @@ public class FileCollectorTest { @Test public void whenFtpesFileAlwaysFail_failWithoutRetry() throws Exception { - FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock)); + FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, new Counters())); doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any()); - final boolean retry = false; FileData fileData = createFileData(FTPES_LOCATION, Scheme.FTPS); - doThrow(new DatafileTaskException("Unable to collect file.", retry)).when(ftpsClientMock) + doThrow(new NonRetryableDatafileTaskException("Unable to collect file.")).when(ftpsClientMock) .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0), contextMap)) @@ -234,7 +236,7 @@ public class FileCollectorTest { @Test public void whenFtpesFileFailOnce_retryAndReturnCorrectResponse() throws Exception { - FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock)); + FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, counters)); doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any()); doThrow(new DatafileTaskException("Unable to collect file.")).doNothing().when(ftpsClientMock) .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); |