From e1a66425d3ba1df5ae2a8f2b99168707e08b655a Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Fri, 22 Mar 2019 14:29:29 +0000 Subject: Local filename updated, stability issues The local filename is changed so it contains PNF name instead of the PNF IP address. The paralellism is restricted to 100 worker threads in order to solve problems with too many open file descriptors and out of memory. Logging is improved. Change-Id: I24ce2e23020cc253a3c7bebac1ab5cf703b5b144 Issue-ID: DCAEGEN2-1118 Signed-off-by: PatrikBuhr --- .../datafile/configuration/AppConfig.java | 6 +- .../datafile/configuration/SchedulerConfig.java | 5 +- .../exceptions/DmaapNotFoundException.java | 31 ---- .../collectors/datafile/model/FileData.java | 27 +++- .../datafile/model/FileReadyMessage.java | 4 - .../datafile/service/JsonMessageParser.java | 36 ++--- .../datafile/tasks/DMaaPMessageConsumerTask.java | 28 ++-- .../datafile/tasks/DataRouterPublisher.java | 7 +- .../collectors/datafile/tasks/FileCollector.java | 33 ++--- .../datafile/tasks/PublishedChecker.java | 11 +- .../collectors/datafile/tasks/ScheduledTasks.java | 156 +++++++++------------ 11 files changed, 144 insertions(+), 200 deletions(-) delete mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java (limited to 'datafile-app-server/src/main/java') diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java index f89a1012..a30d2826 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java @@ -112,10 +112,8 @@ public class AppConfig { setConfiguration(consumerConfiguration, publisherConfiguration, ftpesConfig); } - } catch (IOException e) { - logger.error("Problem with file loading, file: {}", filepath, e); - } catch (JsonSyntaxException e) { - logger.error("Problem with Json deserialization", e); + } catch (JsonSyntaxException | IOException e) { + logger.error("Problem with loading configuration, file: {}", filepath, e); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java index 52723330..d5c8b3b2 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java @@ -63,7 +63,7 @@ public class SchedulerConfig { private static final Logger logger = LoggerFactory.getLogger(SchedulerConfig.class); private static final Marker ENTRY = MarkerFactory.getMarker("ENTRY"); private static final Marker EXIT = MarkerFactory.getMarker("EXIT"); - private static volatile List> scheduledFutureList = new ArrayList<>(); + private static List> scheduledFutureList = new ArrayList<>(); private Map contextMap; private final TaskScheduler taskScheduler; @@ -97,8 +97,7 @@ public class SchedulerConfig { MdcVariables.setMdcContextMap(contextMap); logger.info(EXIT, "Stopped Datafile workflow"); MDC.clear(); - return Mono.defer(() -> Mono - .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED))); + return Mono.just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED)); } /** diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java deleted file mode 100644 index 36279016..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.exceptions; - -/** - * @author Przemysław Wąsala on 4/13/18 - */ -public class DmaapNotFoundException extends DatafileTaskException { - - private static final long serialVersionUID = 1L; - - public DmaapNotFoundException(String message) { - super(message); - } -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java index bdb47b2b..d0d1f91a 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java @@ -37,10 +37,19 @@ import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; public abstract class FileData { private static final String DATAFILE_TMPDIR = "/tmp/onap_datafile/"; + /** + * @return the file name with no path + */ public abstract String name(); + /** + * @return the URL to use to fetch the file from the PNF + */ public abstract String location(); + /** + * @return the file transfer protocol to use for fetching the file + */ public abstract Scheme scheme(); public abstract String compression(); @@ -49,17 +58,25 @@ public abstract class FileData { public abstract String fileFormatVersion(); + public abstract MessageMetaData messageMetaData(); + + /** + * @return the name of the PNF, must be unique in the network + */ + public String sourceName() { + return messageMetaData().sourceName(); + } + public String remoteFilePath() { return URI.create(location()).getPath(); } public Path getLocalFileName() { - URI uri = URI.create(location()); - return createLocalFileName(uri.getHost(), name()); + return createLocalFileName(messageMetaData().sourceName(), name()); } - public static Path createLocalFileName(String host, String fileName) { - return Paths.get(DATAFILE_TMPDIR, host + "_" + fileName); + public static Path createLocalFileName(String sourceName, String fileName) { + return Paths.get(DATAFILE_TMPDIR, sourceName + "_" + fileName); } public FileServerData fileServerData() { @@ -86,4 +103,4 @@ public abstract class FileData { } return Optional.empty(); } -} \ No newline at end of file +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java index e3293faa..9373a4f2 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java @@ -31,9 +31,5 @@ import org.immutables.value.Value; @Value.Immutable @Gson.TypeAdapters public interface FileReadyMessage { - public String pnfName(); - - public MessageMetaData messageMetaData(); - public List files(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java index af45cc99..a3595ecf 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java @@ -100,9 +100,13 @@ public class JsonMessageParser { Optional getJsonObjectFromAnArray(JsonElement element) { JsonParser jsonParser = new JsonParser(); - return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()) - : element.isJsonObject() ? Optional.of((JsonObject) element) - : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); + if (element.isJsonPrimitive()) { + return Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()); + } else if (element.isJsonObject()) { + return Optional.of((JsonObject) element); + } else { + return Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); + } } private Flux getMessagesFromJsonArray(JsonElement jsonElement) { @@ -133,18 +137,17 @@ public class JsonMessageParser { : logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject)); } + private Mono transformMessages(JsonObject message) { Optional optionalMessageMetaData = getMessageMetaData(message); if (optionalMessageMetaData.isPresent()) { + MessageMetaData messageMetaData = optionalMessageMetaData.get(); JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS); JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP); if (arrayOfNamedHashMap != null) { - List allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap); + List allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap, messageMetaData); if (!allFileDataFromJson.isEmpty()) { - MessageMetaData messageMetaData = optionalMessageMetaData.get(); return Mono.just(ImmutableFileReadyMessage.builder() // - .pnfName(messageMetaData.sourceName()) // - .messageMetaData(messageMetaData) // .files(allFileDataFromJson) // .build()); } else { @@ -152,13 +155,14 @@ public class JsonMessageParser { } } - logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message); + logger.error("VES event parsing. Missing arrayOfNamedHashMap in message. {}", message); return Mono.empty(); } - logger.error("Unable to collect file from xNF. FileReady event has incorrect JsonObject. {}", message); + logger.error("VES event parsing. FileReady event has incorrect JsonObject. {}", message); return Mono.empty(); } + private Optional getMessageMetaData(JsonObject message) { List missingValues = new ArrayList<>(); JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER); @@ -185,7 +189,7 @@ public class JsonMessageParser { if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) { return Optional.of(messageMetaData); } else { - String errorMessage = "Unable to collect file from xNF."; + String errorMessage = "VES event parsing."; if (!missingValues.isEmpty()) { errorMessage += " Missing data: " + missingValues; } @@ -206,11 +210,11 @@ public class JsonMessageParser { return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier); } - private List getAllFileDataFromJson(JsonArray arrayOfAdditionalFields) { + private List getAllFileDataFromJson(JsonArray arrayOfAdditionalFields, MessageMetaData messageMetaData) { List res = new ArrayList<>(); for (int i = 0; i < arrayOfAdditionalFields.size(); i++) { JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i); - Optional fileData = getFileDataFromJson(fileInfo); + Optional fileData = getFileDataFromJson(fileInfo, messageMetaData); if (fileData.isPresent()) { res.add(fileData.get()); @@ -219,7 +223,7 @@ public class JsonMessageParser { return res; } - private Optional getFileDataFromJson(JsonObject fileInfo) { + private Optional getFileDataFromJson(JsonObject fileInfo, MessageMetaData messageMetaData) { logger.trace("starting to getFileDataFromJson!"); List missingValues = new ArrayList<>(); @@ -230,7 +234,7 @@ public class JsonMessageParser { try { scheme = Scheme.getSchemeFromString(URI.create(location).getScheme()); } catch (Exception e) { - logger.error("Unable to collect file from xNF.", e); + logger.error("VES event parsing.", e); return Optional.empty(); } FileData fileData = ImmutableFileData.builder() // @@ -240,12 +244,12 @@ public class JsonMessageParser { .location(location) // .scheme(scheme) // .compression(getValueFromJson(data, COMPRESSION, missingValues)) // + .messageMetaData(messageMetaData) .build(); if (missingValues.isEmpty()) { return Optional.of(fileData); } - logger.error("Unable to collect file from xNF. File information wrong. Missing data: {} Data: {}", - missingValues, fileInfo); + logger.error("VES event parsing. File information wrong. Missing data: {} Data: {}", missingValues, fileInfo); return Optional.empty(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java index f6daf733..49e2f01e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java @@ -30,8 +30,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consume import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.reactive.function.client.WebClient; -import reactor.core.publisher.Mono; + import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** * @author Henrik Andersson @@ -39,25 +40,21 @@ import reactor.core.publisher.Flux; public class DMaaPMessageConsumerTask { private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumerTask.class); - private AppConfig datafileAppConfig; - private JsonMessageParser jsonMessageParser; - private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient; + private final JsonMessageParser jsonMessageParser; + private final DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient; public DMaaPMessageConsumerTask(AppConfig datafileAppConfig) { - this.datafileAppConfig = datafileAppConfig; this.jsonMessageParser = new JsonMessageParser(); + this.dmaaPConsumerReactiveHttpClient = createHttpClient(datafileAppConfig); } - protected DMaaPMessageConsumerTask(AppConfig datafileAppConfig, - DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient, + protected DMaaPMessageConsumerTask(DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient, JsonMessageParser messageParser) { - this.datafileAppConfig = datafileAppConfig; this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient; this.jsonMessageParser = messageParser; } public Flux execute() { - dmaaPConsumerReactiveHttpClient = resolveClient(); logger.trace("execute called"); return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse())); } @@ -67,15 +64,10 @@ public class DMaaPMessageConsumerTask { return jsonMessageParser.getMessagesFromJson(message); } - protected DmaapConsumerConfiguration resolveConfiguration() { - return datafileAppConfig.getDmaapConsumerConfiguration(); + private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig) { + DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration(); + WebClient client = new DmaapReactiveWebClient().fromConfiguration(config).build(); + return new DMaaPConsumerReactiveHttpClient(config, client); } - protected DMaaPConsumerReactiveHttpClient resolveClient() { - return new DMaaPConsumerReactiveHttpClient(resolveConfiguration(), buildWebClient()); - } - - protected WebClient buildWebClient() { - return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); - } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java index 0fef9ab4..4207d1fc 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -84,12 +84,13 @@ public class DataRouterPublisher { * @param model information about the file to publish * @param numRetries the maximal number of retries if the publishing fails * @param firstBackoff the time to delay the first retry - * @return the HTTP response status as a string + * @param contextMap tracing context variables + * @return the (same) ConsumerDmaapModel */ public Mono execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff, Map contextMap) { MdcVariables.setMdcContextMap(contextMap); - logger.trace("Method called with arg {}", model); + logger.trace("Publish called with arg {}", model); dmaapProducerReactiveHttpClient = resolveClient(); return Mono.just(model) @@ -117,7 +118,7 @@ public class DataRouterPublisher { logger.trace(response.toString()); return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode())); } catch (Exception e) { - logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e); + logger.warn("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e); return Mono.error(e); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java index fb27a579..8849b45e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -49,31 +49,31 @@ public class FileCollector { this.datafileAppConfig = datafileAppConfig; } - public Mono execute(FileData fileData, MessageMetaData metaData, long maxNumberOfRetries, - Duration firstBackoffTimeout, Map contextMap) { + public Mono execute(FileData fileData, long maxNumberOfRetries, Duration firstBackoffTimeout, + Map contextMap) { MdcVariables.setMdcContextMap(contextMap); logger.trace("Entering execute with {}", fileData); return Mono.just(fileData) // .cache() // - .flatMap(fd -> collectFile(fileData, metaData, contextMap)) // + .flatMap(fd -> collectFile(fileData, contextMap)) // .retryBackoff(maxNumberOfRetries, firstBackoffTimeout); } - private Mono collectFile(FileData fileData, MessageMetaData metaData, - Map contextMap) { + private Mono collectFile(FileData fileData, Map contextMap) { MdcVariables.setMdcContextMap(contextMap); - logger.trace("starting to collectFile"); + logger.trace("starting to collectFile {}", fileData.name()); final String remoteFile = fileData.remoteFilePath(); final Path localFile = fileData.getLocalFileName(); try (FileCollectClient currentClient = createClient(fileData)) { + currentClient.open(); localFile.getParent().toFile().mkdir(); // Create parent directories currentClient.collectFile(remoteFile, localFile); - return Mono.just(getConsumerDmaapModel(fileData, metaData, localFile)); + return Mono.just(getConsumerDmaapModel(fileData, localFile)); } catch (Exception throwable) { - logger.warn("Failed to download file: {}, reason: {}", fileData.name(), throwable); + logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(), throwable.toString()); return Mono.error(throwable); } } @@ -89,9 +89,9 @@ public class FileCollector { } } - private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, MessageMetaData metaData, Path localFile) { + private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, Path localFile) { String location = fileData.location(); - + MessageMetaData metaData = fileData.messageMetaData(); return ImmutableConsumerDmaapModel.builder() // .productName(metaData.productName()) // .vendorName(metaData.vendorName()) // @@ -108,16 +108,13 @@ public class FileCollector { .build(); } - SftpClient createSftpClient(FileData fileData) throws DatafileTaskException { - SftpClient client = new SftpClient(fileData.fileServerData()); - client.open(); - return client; + protected SftpClient createSftpClient(FileData fileData) { + return new SftpClient(fileData.fileServerData()); } - FtpsClient createFtpsClient(FileData fileData) throws DatafileTaskException { + protected FtpsClient createFtpsClient(FileData fileData) { FtpesConfig config = datafileAppConfig.getFtpesConfiguration(); - FtpsClient client = new FtpsClient(fileData.fileServerData()); - client.open(config.keyCert(), config.keyPassword(), Paths.get(config.trustedCA()), config.trustedCAPassword()); - return client; + return new FtpsClient(fileData.fileServerData(), config.keyCert(), config.keyPassword(), + Paths.get(config.trustedCA()), config.trustedCAPassword()); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java index 0729caa0..41f8e3cd 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java @@ -54,7 +54,7 @@ public class PublishedChecker { private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private AppConfig appConfig; + private final AppConfig appConfig; /** * Constructor. @@ -88,12 +88,13 @@ public class PublishedChecker { HttpResponse response = producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest, 2000, contextMap); - logger.trace(response.toString()); + logger.trace("{}", response); int status = response.getStatusLine().getStatusCode(); HttpEntity entity = response.getEntity(); - InputStream content = entity.getContent(); - String body = IOUtils.toString(content); - return HttpStatus.SC_OK == status && !"[]".equals(body); + try (InputStream content = entity.getContent()) { + String body = IOUtils.toString(content); + return HttpStatus.SC_OK == status && !"[]".equals(body); + } } catch (Exception e) { logger.warn("Unable to check if file has been published.", e); return false; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index d41e5c25..b4096c73 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -20,8 +20,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -29,7 +27,6 @@ import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; -import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables; import org.onap.dcaegen2.collectors.datafile.service.PublishedFileCache; import org.slf4j.Logger; @@ -44,32 +41,24 @@ import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; /** - * This implements the main flow of the data file collector. Fetch file ready events from the message router, fetch new - * files from the PNF publish these in the data router. + * This implements the main flow of the data file collector. Fetch file ready events from the + * message router, fetch new files from the PNF publish these in the data router. */ @Component public class ScheduledTasks { - private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200; - private static final int MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS = 10; - - /** Data needed for fetching of one file. */ - private class FileCollectionData { - final FileData fileData; - final MessageMetaData metaData; - - FileCollectionData(FileData fd, MessageMetaData metaData) { - this.fileData = fd; - this.metaData = metaData; - } - } + private static final int NUMBER_OF_WORKER_THREADS = 100; + private static final int MAX_TASKS_FOR_POLLING = 50; + private static final long DATA_ROUTER_MAX_RETRIES = 5; + private static final Duration DATA_ROUTER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(2); + private static final long FILE_TRANSFER_MAX_RETRIES = 3; + private static final Duration FILE_TRANSFER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(5); private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); private final AppConfig applicationConfiguration; private final AtomicInteger currentNumberOfTasks = new AtomicInteger(); - private final Scheduler scheduler = - Schedulers.newElastic("DataFileCollector", MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS); + private final Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS); PublishedFileCache alreadyPublishedFiles = new PublishedFileCache(); /** @@ -86,21 +75,25 @@ public class ScheduledTasks { * Main function for scheduling for the file collection Workflow. */ public void scheduleMainDatafileEventTask(Map contextMap) { - MdcVariables.setMdcContextMap(contextMap); - logger.trace("Execution of tasks was registered"); - applicationConfiguration.loadConfigurationFromFile(); - createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap), - () -> onComplete(contextMap)); + try { + MdcVariables.setMdcContextMap(contextMap); + logger.trace("Execution of tasks was registered"); + applicationConfiguration.loadConfigurationFromFile(); + createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap), + () -> onComplete(contextMap)); + } catch (Exception e) { + logger.error("Unexpected exception: ", e); + } } Flux createMainTask(Map contextMap) { return fetchMoreFileReadyMessages() // - .parallel(getParallelism()) // Each FileReadyMessage in a separate thread + .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread .runOn(scheduler) // - .flatMap(this::createFileCollectionTask) // + .flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) // .filter(fileData -> shouldBePublished(fileData, contextMap)) // .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) // - .flatMap(fileData -> collectFileFromXnf(fileData, contextMap)) // + .flatMap(fileData -> fetchFile(fileData, contextMap)) // .flatMap(model -> publishToDataRouter(model, contextMap)) // .doOnNext(model -> deleteFile(model.getInternalLocation(), contextMap)) // .doOnNext(model -> currentNumberOfTasks.decrementAndGet()) // @@ -114,63 +107,61 @@ public class ScheduledTasks { alreadyPublishedFiles.purge(now); } - private void onComplete(Map contextMap) { - MdcVariables.setMdcContextMap(contextMap); - logger.info("Datafile tasks have been completed"); + protected PublishedChecker createPublishedChecker() { + return new PublishedChecker(applicationConfiguration); } - private void onSuccess(ConsumerDmaapModel model, Map contextMap) { - MdcVariables.setMdcContextMap(contextMap); - logger.info("Datafile consumed tasks {}", model.getInternalLocation()); + protected int getCurrentNumberOfTasks() { + return currentNumberOfTasks.get(); } - private void onError(Throwable throwable, Map contextMap) { - MdcVariables.setMdcContextMap(contextMap); - logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable); + protected DMaaPMessageConsumerTask createConsumerTask() { + return new DMaaPMessageConsumerTask(this.applicationConfiguration); } - private int getParallelism() { - if (MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks() > 0) { - return MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks(); - } else { - return 1; // We need at least one rail/thread - } + protected FileCollector createFileCollector() { + return new FileCollector(applicationConfiguration); + } + + protected DataRouterPublisher createDataRouterPublisher() { + return new DataRouterPublisher(applicationConfiguration); } - private Flux createFileCollectionTask(FileReadyMessage availableFiles) { - List fileCollects = new ArrayList<>(); + private void onComplete(Map contextMap) { + MdcVariables.setMdcContextMap(contextMap); + logger.trace("Datafile tasks have been completed"); + } - for (FileData fileData : availableFiles.files()) { - fileCollects.add(new FileCollectionData(fileData, availableFiles.messageMetaData())); - } - return Flux.fromIterable(fileCollects); + private synchronized void onSuccess(ConsumerDmaapModel model, Map contextMap) { + MdcVariables.setMdcContextMap(contextMap); + logger.info("Datafile file published {}", model.getInternalLocation()); } - private boolean shouldBePublished(FileCollectionData task, Map contextMap) { + private void onError(Throwable throwable, Map contextMap) { + MdcVariables.setMdcContextMap(contextMap); + logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable.toString()); + } + + private boolean shouldBePublished(FileData fileData, Map contextMap) { boolean result = false; - Path localFileName = task.fileData.getLocalFileName(); + Path localFileName = fileData.getLocalFileName(); if (alreadyPublishedFiles.put(localFileName) == null) { result = !createPublishedChecker().execute(localFileName.getFileName().toString(), contextMap); } return result; } - private Mono collectFileFromXnf(FileCollectionData fileCollect, - Map contextMap) { - final long maxNUmberOfRetries = 3; - final Duration initialRetryTimeout = Duration.ofSeconds(5); - + private Mono fetchFile(FileData fileData, Map contextMap) { MdcVariables.setMdcContextMap(contextMap); return createFileCollector() - .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout, - contextMap) - .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, contextMap)); + .execute(fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, contextMap) + .onErrorResume(exception -> handleFetchFileFailure(fileData, contextMap)); } - private Mono handleCollectFailure(FileData fileData, Map contextMap) { + private Mono handleFetchFileFailure(FileData fileData, Map contextMap) { MdcVariables.setMdcContextMap(contextMap); Path localFileName = fileData.getLocalFileName(); - logger.error("File fetching failed: {}", localFileName); + logger.error("File fetching failed, fileData {}", fileData); deleteFile(localFileName, contextMap); alreadyPublishedFiles.remove(localFileName); currentNumberOfTasks.decrementAndGet(); @@ -178,19 +169,16 @@ public class ScheduledTasks { } private Mono publishToDataRouter(ConsumerDmaapModel model, Map contextMap) { - final long maxNumberOfRetries = 3; - final Duration initialRetryTimeout = Duration.ofSeconds(5); - - MdcVariables.setMdcContextMap(contextMap); - return createDataRouterPublisher().execute(model, maxNumberOfRetries, initialRetryTimeout, contextMap) - .onErrorResume(exception -> handlePublishFailure(model, exception, contextMap)); + + return createDataRouterPublisher() + .execute(model, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT, contextMap) + .onErrorResume(exception -> handlePublishFailure(model, contextMap)); } - private Mono handlePublishFailure(ConsumerDmaapModel model, Throwable exception, - Map contextMap) { + private Mono handlePublishFailure(ConsumerDmaapModel model, Map contextMap) { MdcVariables.setMdcContextMap(contextMap); - logger.error("File publishing failed: {}, exception: {}", model.getName(), exception); + logger.error("File publishing failed: {}", model); Path internalFileName = model.getInternalLocation(); deleteFile(internalFileName, contextMap); alreadyPublishedFiles.remove(internalFileName); @@ -202,8 +190,9 @@ public class ScheduledTasks { * Fetch more messages from the message router. This is done in a polling/blocking fashion. */ private Flux fetchMoreFileReadyMessages() { - logger.trace("Consuming new file ready messages, current number of tasks: {}", getCurrentNumberOfTasks()); - if (getCurrentNumberOfTasks() > MAX_NUMBER_OF_CONCURRENT_TASKS) { + logger.info("Consuming new file ready messages, current number of tasks: {}", getCurrentNumberOfTasks()); + if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING) { + logger.info("Skipping, current number of tasks: {}", getCurrentNumberOfTasks()); return Flux.empty(); } @@ -215,7 +204,8 @@ public class ScheduledTasks { private Flux handleConsumeMessageFailure(Throwable exception, Map contextMap) { MdcVariables.setMdcContextMap(contextMap); - logger.error("Polling for file ready message failed, exception: {}", exception); + logger.error("Polling for file ready message failed, exception: {}, config: {}", exception.toString(), + this.applicationConfiguration.getDmaapConsumerConfiguration()); return Flux.empty(); } @@ -229,24 +219,4 @@ public class ScheduledTasks { } } - PublishedChecker createPublishedChecker() { - return new PublishedChecker(applicationConfiguration); - } - - int getCurrentNumberOfTasks() { - return currentNumberOfTasks.get(); - } - - DMaaPMessageConsumerTask createConsumerTask() { - return new DMaaPMessageConsumerTask(this.applicationConfiguration); - } - - FileCollector createFileCollector() { - return new FileCollector(applicationConfiguration); - } - - DataRouterPublisher createDataRouterPublisher() { - return new DataRouterPublisher(applicationConfiguration); - } - } -- cgit 1.2.3-korg