diff options
author | PatrikBuhr <patrik.buhr@est.tech> | 2019-06-14 06:38:21 +0000 |
---|---|---|
committer | PatrikBuhr <patrik.buhr@est.tech> | 2019-06-14 06:38:21 +0000 |
commit | dafd553cf1694585b35fd7132c6bafdef1e98ed6 (patch) | |
tree | 7882694255ee951de06e9e3060af508c9e184385 /datafile-app-server/src/main/java/org | |
parent | addf3f10dd0cc1d0797151633839c65276af8b1c (diff) |
Bugfix, improved behaviour for large files
Previously files was read into a buffer for publishing.
This does not work when files are bigger than the available memory.
After the fix , files are streamed instead.
Implemented a new REST primitive for exposing status and
statistics. To be used for test and trouble shooting.
Change-Id: Iab5a1ee9ffcbf6836fcf709d115bf25ab0391732
Issue-ID: DCAEGEN2-1532
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Diffstat (limited to 'datafile-app-server/src/main/java/org')
9 files changed, 250 insertions, 69 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusController.java index c6d56c42..40f9d99b 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusController.java @@ -19,6 +19,8 @@ package org.onap.dcaegen2.collectors.datafile.controllers; import static org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext.ENTRY; import static org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext.EXIT; +import org.onap.dcaegen2.collectors.datafile.model.Counters; + import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; @@ -37,28 +39,25 @@ import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Mono; /** - * Controller to check the heartbeat of DFC. - * - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/19/18 - * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> + * REST Controller to check the heart beat and status of the DFC. */ @RestController -@Api(value = "HeartbeatController") -public class HeartbeatController { +@Api(value = "StatusController") +public class StatusController { - private static final Logger logger = LoggerFactory.getLogger(HeartbeatController.class); + private static final Logger logger = LoggerFactory.getLogger(StatusController.class); private final ScheduledTasks scheduledTasks; @Autowired - public HeartbeatController(ScheduledTasks scheduledTasks) { + public StatusController(ScheduledTasks scheduledTasks) { this.scheduledTasks = scheduledTasks; } /** - * Checks the heartbeat of DFC. + * Checks the heart beat of DFC. * - * @return the heartbeat status of DFC. + * @return the heart beat status of DFC. */ @GetMapping("/heartbeat") @ApiOperation(value = "Returns liveness of DATAFILE service") @@ -66,18 +65,41 @@ public class HeartbeatController { @ApiResponse(code = 200, message = "DATAFILE service is living"), @ApiResponse(code = 401, message = "You are not authorized to view the resource"), @ApiResponse(code = 403, message = "Accessing the resource you were trying to reach is forbidden"), - @ApiResponse(code = 404, message = "The resource you were trying to reach is not found") }) + @ApiResponse(code = 404, message = "The resource you were trying to reach is not found")}) public Mono<ResponseEntity<String>> heartbeat(@RequestHeader HttpHeaders headers) { MappedDiagnosticContext.initializeTraceContext(headers); logger.info(ENTRY, "Heartbeat request"); - StringBuilder statusString = new StringBuilder("I'm living! Status: "); - statusString.append("numberOfFileCollectionTasks=").append(scheduledTasks.getCurrentNumberOfTasks()) - .append(","); - statusString.append("fileCacheSize=").append(scheduledTasks.publishedFilesCacheSize()); + String statusString = "I'm living!"; - Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>(statusString.toString(), HttpStatus.OK)); + Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>(statusString, HttpStatus.OK)); logger.info(EXIT, "Heartbeat request"); return response; } + + /** + * Returns diagnostics and statistics information. It is intended for testing and trouble + * shooting. + * + * @return information. + */ + @GetMapping("/status") + @ApiOperation(value = "Returns status and statistics of DATAFILE service") + @ApiResponses(value = { // + @ApiResponse(code = 200, message = "DATAFILE service is living"), + @ApiResponse(code = 401, message = "You are not authorized to view the resource"), + @ApiResponse(code = 403, message = "Accessing the resource you were trying to reach is forbidden"), + @ApiResponse(code = 404, message = "The resource you were trying to reach is not found")}) + public Mono<ResponseEntity<String>> status(@RequestHeader HttpHeaders headers) { + MappedDiagnosticContext.initializeTraceContext(headers); + logger.info(ENTRY, "Status request"); + + Counters counters = scheduledTasks.getCounters(); + Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>(counters.toString(), HttpStatus.OK)); + logger.info(EXIT, "Status request"); + return response; + } + + + } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java index 38f74ed1..6aa76157 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java @@ -21,29 +21,13 @@ package org.onap.dcaegen2.collectors.datafile.exceptions; public class DatafileTaskException extends Exception { private static final long serialVersionUID = 1L; - private final boolean isRetryable; public DatafileTaskException(String message) { super(message); - isRetryable = true; - } - - public DatafileTaskException(String message, boolean retry) { - super(message); - isRetryable = retry; } public DatafileTaskException(String message, Exception originalException) { super(message, originalException); - isRetryable = true; } - public DatafileTaskException(String message, Exception originalException, boolean retry) { - super(message, originalException); - isRetryable = retry; - } - - public boolean isRetryable() { - return this.isRetryable; - } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/NonRetryableDatafileTaskException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/NonRetryableDatafileTaskException.java new file mode 100644 index 00000000..a4bdd667 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/NonRetryableDatafileTaskException.java @@ -0,0 +1,34 @@ +/*- + * ============LICENSE_START====================================================================== + * Copyright (C) 2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.exceptions; + + +public class NonRetryableDatafileTaskException extends DatafileTaskException { + + private static final long serialVersionUID = 1L; + + public NonRetryableDatafileTaskException(String message) { + super(message); + } + + public NonRetryableDatafileTaskException(String message, Exception originalException) { + super(message, originalException); + } + +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java index bb3016ce..18288603 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java @@ -38,6 +38,7 @@ import org.apache.commons.net.ftp.FTPReply; import org.apache.commons.net.ftp.FTPSClient; import org.apache.commons.net.util.KeyManagerUtils; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.io.FileSystemResource; @@ -119,8 +120,7 @@ public class FtpsClient implements FileCollectClient { try (OutputStream output = createOutputStream(localFileName)) { logger.trace("begin to retrieve from xNF."); if (!realFtpsClient.retrieveFile(remoteFileName, output)) { - final boolean retry = false; // Skip retrying for all problems except IOException - throw new DatafileTaskException("Could not retrieve file " + remoteFileName, retry); + throw new NonRetryableDatafileTaskException("Could not retrieve file. No retry attempts will be done, file :" + remoteFileName); } } catch (IOException e) { throw new DatafileTaskException("Could not fetch file: " + e, e); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java index ec523354..f5d10d34 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java @@ -25,6 +25,7 @@ import com.jcraft.jsch.SftpException; import java.nio.file.Path; import java.util.Optional; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,7 +58,12 @@ public class SftpClient implements FileCollectClient { } catch (SftpException e) { boolean retry = e.id != ChannelSftp.SSH_FX_NO_SUCH_FILE && e.id != ChannelSftp.SSH_FX_PERMISSION_DENIED && e.id != ChannelSftp.SSH_FX_OP_UNSUPPORTED; - throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e, retry); + if (retry) { + throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e); + } else { + throw new NonRetryableDatafileTaskException( + "Unable to get file from xNF. No retry attempts will be done. Data: " + fileServerData, e); + } } logger.trace("collectFile OK"); @@ -85,7 +91,12 @@ public class SftpClient implements FileCollectClient { } } catch (JSchException e) { boolean retry = !e.getMessage().contains("Auth fail"); - throw new DatafileTaskException("Could not open Sftp client. " + e, e, retry); + if (retry) { + throw new DatafileTaskException("Could not open Sftp client. " + e); + } else { + throw new NonRetryableDatafileTaskException( + "Could not open Sftp client, no retry attempts will be done " + e); + } } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java new file mode 100644 index 00000000..5efbe37a --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java @@ -0,0 +1,108 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.model; + +import java.time.Instant; +import java.util.concurrent.atomic.AtomicInteger; + + +/** + * + * Various counters that can be shown via a REST API. + * + */ +public class Counters { + + private final AtomicInteger numberOfTasks = new AtomicInteger(); + private final AtomicInteger numberOfSubscriptions = new AtomicInteger(); + private int noOfCollectedFiles = 0; + private int noOfFailedFtpAttempts = 0; + private int noOfFailedFtp = 0; + private int noOfFailedPublishAttempts = 0; + private int totalPublishedFiles = 0; + private int noOfFailedPublish = 0; + private Instant lastPublishedTime = Instant.MIN; + private int totalReceivedEvents = 0; + private Instant lastEventTime = Instant.MIN; + + public AtomicInteger getCurrentNumberOfTasks() { + return numberOfTasks; + } + + public AtomicInteger getCurrentNumberOfSubscriptions() { + return numberOfSubscriptions; + } + + public synchronized void incNoOfReceivedEvents() { + totalReceivedEvents++; + lastEventTime = Instant.now(); + } + + public synchronized void incNoOfCollectedFiles() { + noOfCollectedFiles++; + } + + public synchronized void incNoOfFailedFtpAttempts() { + noOfFailedFtpAttempts++; + } + + public synchronized void incNoOfFailedFtp() { + noOfFailedFtp++; + } + + public synchronized void incNoOfFailedPublishAttempts() { + noOfFailedPublishAttempts++; + } + + public synchronized void incTotalPublishedFiles() { + totalPublishedFiles++; + lastPublishedTime = Instant.now(); + } + + public synchronized void incNoOfFailedPublish() { + noOfFailedPublish++; + } + + public synchronized String toString() { + StringBuilder str = new StringBuilder(); + str.append(format("totalReceivedEvents", totalReceivedEvents)); + str.append(format("lastEventTime", lastEventTime)); + str.append(format("numberOfTasks", numberOfTasks)); + str.append(format("numberOfSubscriptions", numberOfSubscriptions)); + str.append("\n"); + str.append(format("collectedFiles", noOfCollectedFiles)); + str.append(format("failedFtpAttempts", noOfFailedFtpAttempts)); + str.append(format("failedFtp", noOfFailedFtp)); + str.append("\n"); + str.append(format("totalPublishedFiles", totalPublishedFiles)); + str.append(format("lastPublishedTime", lastPublishedTime)); + + str.append(format("failedPublishAttempts", noOfFailedPublishAttempts)); + str.append(format("noOfFailedPublish", noOfFailedPublish)); + + return str.toString(); + } + + private String format(String name, Object value) { + String header = name + ":"; + return String.format("%-24s%-22s\n", header, value); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java index d9efe802..02e153cf 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -22,19 +22,20 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import com.google.gson.JsonElement; import com.google.gson.JsonParser; +import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.net.MalformedURLException; import java.net.URI; import java.nio.file.Path; import java.time.Duration; -import org.apache.commons.io.IOUtils; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpPut; -import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.FileEntity; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.model.Counters; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.JsonSerializer; import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; @@ -62,9 +63,11 @@ public class DataRouterPublisher { private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class); private final AppConfig datafileAppConfig; + private final Counters counters; - public DataRouterPublisher(AppConfig datafileAppConfig) { + public DataRouterPublisher(AppConfig datafileAppConfig, Counters counters) { this.datafileAppConfig = datafileAppConfig; + this.counters = counters; } /** @@ -98,8 +101,10 @@ public class DataRouterPublisher { HttpResponse response = dmaapProducerHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext()); logger.trace("{}", response); + counters.incTotalPublishedFiles(); return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode())); } catch (Exception e) { + counters.incNoOfFailedPublishAttempts(); logger.warn("Publishing file {} to DR unsuccessful.", publishInfo.getName(), e); return Mono.error(e); } @@ -121,10 +126,9 @@ public class DataRouterPublisher { } private void prepareBody(FilePublishInformation publishInfo, HttpPut put) throws IOException { - Path fileLocation = publishInfo.getInternalLocation(); - try (InputStream fileInputStream = createInputStream(fileLocation)) { - put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream))); - } + File file = createInputFile(publishInfo.getInternalLocation()); + FileEntity entity = new FileEntity(file, ContentType.DEFAULT_BINARY); + put.setEntity(entity); } private static Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, @@ -140,9 +144,9 @@ public class DataRouterPublisher { } } - InputStream createInputStream(Path filePath) throws IOException { + File createInputFile(Path filePath) throws IOException { FileSystemResource realResource = new FileSystemResource(filePath); - return realResource.getInputStream(); + return realResource.getFile(); } PublisherConfiguration resolveConfiguration(String changeIdentifer) throws DatafileTaskException { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java index aeacaffc..311f752d 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -24,9 +24,11 @@ import java.util.Optional; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException; import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient; import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient; import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; +import org.onap.dcaegen2.collectors.datafile.model.Counters; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation; @@ -45,14 +47,16 @@ public class FileCollector { private static final Logger logger = LoggerFactory.getLogger(FileCollector.class); private final AppConfig datafileAppConfig; + private final Counters counters; /** * Constructor. * * @param datafileAppConfig application configuration */ - public FileCollector(AppConfig datafileAppConfig) { + public FileCollector(AppConfig datafileAppConfig, Counters counters) { this.datafileAppConfig = datafileAppConfig; + this.counters = counters; } /** @@ -97,14 +101,16 @@ public class FileCollector { currentClient.open(); localFile.getParent().toFile().mkdir(); // Create parent directories currentClient.collectFile(remoteFile, localFile); + counters.incNoOfCollectedFiles(); return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context))); } catch (DatafileTaskException e) { logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(), e.toString()); - if (e.isRetryable()) { - return Mono.error(e); - } else { + counters.incNoOfFailedFtpAttempts(); + if (e instanceof NonRetryableDatafileTaskException) { return Mono.just(Optional.empty()); // Give up + } else { + return Mono.error(e); } } catch (Exception throwable) { logger.warn("Failed to close ftp client: {} {}, reason: {}", fileData.sourceName(), fileData.name(), diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 99b2d918..300ca601 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.model.Counters; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; @@ -41,8 +42,8 @@ import reactor.core.scheduler.Schedulers; /** - * This implements the main flow of the data file collector. Fetch file ready events from the message router, fetch new - * files from the PNF publish these in the data router. + * This implements the main flow of the data file collector. Fetch file ready events from the + * message router, fetch new files from the PNF publish these in the data router. */ @Component public class ScheduledTasks { @@ -57,11 +58,12 @@ public class ScheduledTasks { private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); private final AppConfig applicationConfiguration; - private final AtomicInteger currentNumberOfTasks = new AtomicInteger(); + private final AtomicInteger currentNumberOfTasks; private final AtomicInteger threadPoolQueueSize = new AtomicInteger(); - private final AtomicInteger currentNumberOfSubscriptions = new AtomicInteger(); + private final AtomicInteger currentNumberOfSubscriptions; private final Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS); PublishedFileCache publishedFilesCache = new PublishedFileCache(); + private Counters counters = new Counters(); /** * Constructor for task registration in Datafile Workflow. @@ -71,6 +73,8 @@ public class ScheduledTasks { @Autowired public ScheduledTasks(AppConfig applicationConfiguration) { this.applicationConfiguration = applicationConfiguration; + this.currentNumberOfTasks = counters.getCurrentNumberOfTasks(); + this.currentNumberOfSubscriptions = counters.getCurrentNumberOfSubscriptions(); } /** @@ -112,6 +116,7 @@ public class ScheduledTasks { Flux<FilePublishInformation> createMainTask(Map<String, String> context) { return fetchMoreFileReadyMessages() // .doOnNext(fileReadyMessage -> threadPoolQueueSize.incrementAndGet()) // + .doOnNext(fileReadyMessage -> counters.incNoOfReceivedEvents()) // .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread .runOn(scheduler) // .doOnNext(fileReadyMessage -> threadPoolQueueSize.decrementAndGet()) // @@ -148,32 +153,21 @@ public class ScheduledTasks { return new PublishedChecker(applicationConfiguration); } - public int getCurrentNumberOfTasks() { - return currentNumberOfTasks.get(); + public Counters getCounters() { + return this.counters; } - public int publishedFilesCacheSize() { - return publishedFilesCache.size(); - } - - public int getCurrentNumberOfSubscriptions() { - return currentNumberOfSubscriptions.get(); - } - - public int getThreadPoolQueueSize() { - return this.threadPoolQueueSize.get(); - } protected DMaaPMessageConsumer createConsumerTask() throws DatafileTaskException { return new DMaaPMessageConsumer(this.applicationConfiguration); } protected FileCollector createFileCollector() { - return new FileCollector(applicationConfiguration); + return new FileCollector(applicationConfiguration, counters); } protected DataRouterPublisher createDataRouterPublisher() { - return new DataRouterPublisher(applicationConfiguration); + return new DataRouterPublisher(applicationConfiguration, counters); } private static void onComplete(Map<String, String> contextMap) { @@ -181,6 +175,22 @@ public class ScheduledTasks { logger.trace("Datafile tasks have been completed"); } + int publishedFilesCacheSize() { + return publishedFilesCache.size(); + } + + int getCurrentNumberOfTasks() { + return this.currentNumberOfTasks.get(); + } + + int getCurrentNumberOfSubscriptions() { + return this.currentNumberOfSubscriptions.get(); + } + + int getThreadPoolQueueSize() { + return this.threadPoolQueueSize.get(); + } + private static synchronized void onSuccess(FilePublishInformation publishInfo) { MDC.setContextMap(publishInfo.getContext()); logger.info("Datafile file published {}", publishInfo.getInternalLocation()); @@ -239,6 +249,7 @@ public class ScheduledTasks { deleteFile(localFilePath, fileData.context); publishedFilesCache.remove(localFilePath); currentNumberOfTasks.decrementAndGet(); + counters.incNoOfFailedFtp(); return Mono.empty(); } @@ -257,6 +268,7 @@ public class ScheduledTasks { deleteFile(internalFileName, publishInfo.getContext()); publishedFilesCache.remove(internalFileName); currentNumberOfTasks.decrementAndGet(); + counters.incNoOfFailedPublish(); return Mono.empty(); } |