diff options
Diffstat (limited to 'datafile-app-server/src')
17 files changed, 204 insertions, 253 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java index f89a1012..a30d2826 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java @@ -112,10 +112,8 @@ public class AppConfig { setConfiguration(consumerConfiguration, publisherConfiguration, ftpesConfig); } - } catch (IOException e) { - logger.error("Problem with file loading, file: {}", filepath, e); - } catch (JsonSyntaxException e) { - logger.error("Problem with Json deserialization", e); + } catch (JsonSyntaxException | IOException e) { + logger.error("Problem with loading configuration, file: {}", filepath, e); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java index 52723330..d5c8b3b2 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java @@ -63,7 +63,7 @@ public class SchedulerConfig { private static final Logger logger = LoggerFactory.getLogger(SchedulerConfig.class); private static final Marker ENTRY = MarkerFactory.getMarker("ENTRY"); private static final Marker EXIT = MarkerFactory.getMarker("EXIT"); - private static volatile List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>(); + private static List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>(); private Map<String, String> contextMap; private final TaskScheduler taskScheduler; @@ -97,8 +97,7 @@ public class SchedulerConfig { MdcVariables.setMdcContextMap(contextMap); logger.info(EXIT, "Stopped Datafile workflow"); MDC.clear(); - return Mono.defer(() -> Mono - .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED))); + return Mono.just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED)); } /** diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java deleted file mode 100644 index 36279016..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.exceptions; - -/** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 - */ -public class DmaapNotFoundException extends DatafileTaskException { - - private static final long serialVersionUID = 1L; - - public DmaapNotFoundException(String message) { - super(message); - } -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java index bdb47b2b..d0d1f91a 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java @@ -37,10 +37,19 @@ import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; public abstract class FileData { private static final String DATAFILE_TMPDIR = "/tmp/onap_datafile/"; + /** + * @return the file name with no path + */ public abstract String name(); + /** + * @return the URL to use to fetch the file from the PNF + */ public abstract String location(); + /** + * @return the file transfer protocol to use for fetching the file + */ public abstract Scheme scheme(); public abstract String compression(); @@ -49,17 +58,25 @@ public abstract class FileData { public abstract String fileFormatVersion(); + public abstract MessageMetaData messageMetaData(); + + /** + * @return the name of the PNF, must be unique in the network + */ + public String sourceName() { + return messageMetaData().sourceName(); + } + public String remoteFilePath() { return URI.create(location()).getPath(); } public Path getLocalFileName() { - URI uri = URI.create(location()); - return createLocalFileName(uri.getHost(), name()); + return createLocalFileName(messageMetaData().sourceName(), name()); } - public static Path createLocalFileName(String host, String fileName) { - return Paths.get(DATAFILE_TMPDIR, host + "_" + fileName); + public static Path createLocalFileName(String sourceName, String fileName) { + return Paths.get(DATAFILE_TMPDIR, sourceName + "_" + fileName); } public FileServerData fileServerData() { @@ -86,4 +103,4 @@ public abstract class FileData { } return Optional.empty(); } -}
\ No newline at end of file +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java index e3293faa..9373a4f2 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java @@ -31,9 +31,5 @@ import org.immutables.value.Value; @Value.Immutable @Gson.TypeAdapters public interface FileReadyMessage { - public String pnfName(); - - public MessageMetaData messageMetaData(); - public List<FileData> files(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java index af45cc99..a3595ecf 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java @@ -100,9 +100,13 @@ public class JsonMessageParser { Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { JsonParser jsonParser = new JsonParser(); - return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()) - : element.isJsonObject() ? Optional.of((JsonObject) element) - : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); + if (element.isJsonPrimitive()) { + return Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()); + } else if (element.isJsonObject()) { + return Optional.of((JsonObject) element); + } else { + return Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); + } } private Flux<FileReadyMessage> getMessagesFromJsonArray(JsonElement jsonElement) { @@ -133,18 +137,17 @@ public class JsonMessageParser { : logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject)); } + private Mono<FileReadyMessage> transformMessages(JsonObject message) { Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message); if (optionalMessageMetaData.isPresent()) { + MessageMetaData messageMetaData = optionalMessageMetaData.get(); JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS); JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP); if (arrayOfNamedHashMap != null) { - List<FileData> allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap); + List<FileData> allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap, messageMetaData); if (!allFileDataFromJson.isEmpty()) { - MessageMetaData messageMetaData = optionalMessageMetaData.get(); return Mono.just(ImmutableFileReadyMessage.builder() // - .pnfName(messageMetaData.sourceName()) // - .messageMetaData(messageMetaData) // .files(allFileDataFromJson) // .build()); } else { @@ -152,13 +155,14 @@ public class JsonMessageParser { } } - logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message); + logger.error("VES event parsing. Missing arrayOfNamedHashMap in message. {}", message); return Mono.empty(); } - logger.error("Unable to collect file from xNF. FileReady event has incorrect JsonObject. {}", message); + logger.error("VES event parsing. FileReady event has incorrect JsonObject. {}", message); return Mono.empty(); } + private Optional<MessageMetaData> getMessageMetaData(JsonObject message) { List<String> missingValues = new ArrayList<>(); JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER); @@ -185,7 +189,7 @@ public class JsonMessageParser { if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) { return Optional.of(messageMetaData); } else { - String errorMessage = "Unable to collect file from xNF."; + String errorMessage = "VES event parsing."; if (!missingValues.isEmpty()) { errorMessage += " Missing data: " + missingValues; } @@ -206,11 +210,11 @@ public class JsonMessageParser { return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier); } - private List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields) { + private List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields, MessageMetaData messageMetaData) { List<FileData> res = new ArrayList<>(); for (int i = 0; i < arrayOfAdditionalFields.size(); i++) { JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i); - Optional<FileData> fileData = getFileDataFromJson(fileInfo); + Optional<FileData> fileData = getFileDataFromJson(fileInfo, messageMetaData); if (fileData.isPresent()) { res.add(fileData.get()); @@ -219,7 +223,7 @@ public class JsonMessageParser { return res; } - private Optional<FileData> getFileDataFromJson(JsonObject fileInfo) { + private Optional<FileData> getFileDataFromJson(JsonObject fileInfo, MessageMetaData messageMetaData) { logger.trace("starting to getFileDataFromJson!"); List<String> missingValues = new ArrayList<>(); @@ -230,7 +234,7 @@ public class JsonMessageParser { try { scheme = Scheme.getSchemeFromString(URI.create(location).getScheme()); } catch (Exception e) { - logger.error("Unable to collect file from xNF.", e); + logger.error("VES event parsing.", e); return Optional.empty(); } FileData fileData = ImmutableFileData.builder() // @@ -240,12 +244,12 @@ public class JsonMessageParser { .location(location) // .scheme(scheme) // .compression(getValueFromJson(data, COMPRESSION, missingValues)) // + .messageMetaData(messageMetaData) .build(); if (missingValues.isEmpty()) { return Optional.of(fileData); } - logger.error("Unable to collect file from xNF. File information wrong. Missing data: {} Data: {}", - missingValues, fileInfo); + logger.error("VES event parsing. File information wrong. Missing data: {} Data: {}", missingValues, fileInfo); return Optional.empty(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java index f6daf733..49e2f01e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java @@ -30,8 +30,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consume import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.reactive.function.client.WebClient; -import reactor.core.publisher.Mono; + import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> @@ -39,25 +40,21 @@ import reactor.core.publisher.Flux; public class DMaaPMessageConsumerTask { private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumerTask.class); - private AppConfig datafileAppConfig; - private JsonMessageParser jsonMessageParser; - private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient; + private final JsonMessageParser jsonMessageParser; + private final DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient; public DMaaPMessageConsumerTask(AppConfig datafileAppConfig) { - this.datafileAppConfig = datafileAppConfig; this.jsonMessageParser = new JsonMessageParser(); + this.dmaaPConsumerReactiveHttpClient = createHttpClient(datafileAppConfig); } - protected DMaaPMessageConsumerTask(AppConfig datafileAppConfig, - DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient, + protected DMaaPMessageConsumerTask(DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient, JsonMessageParser messageParser) { - this.datafileAppConfig = datafileAppConfig; this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient; this.jsonMessageParser = messageParser; } public Flux<FileReadyMessage> execute() { - dmaaPConsumerReactiveHttpClient = resolveClient(); logger.trace("execute called"); return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse())); } @@ -67,15 +64,10 @@ public class DMaaPMessageConsumerTask { return jsonMessageParser.getMessagesFromJson(message); } - protected DmaapConsumerConfiguration resolveConfiguration() { - return datafileAppConfig.getDmaapConsumerConfiguration(); + private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig) { + DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration(); + WebClient client = new DmaapReactiveWebClient().fromConfiguration(config).build(); + return new DMaaPConsumerReactiveHttpClient(config, client); } - protected DMaaPConsumerReactiveHttpClient resolveClient() { - return new DMaaPConsumerReactiveHttpClient(resolveConfiguration(), buildWebClient()); - } - - protected WebClient buildWebClient() { - return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); - } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java index 0fef9ab4..4207d1fc 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -84,12 +84,13 @@ public class DataRouterPublisher { * @param model information about the file to publish * @param numRetries the maximal number of retries if the publishing fails * @param firstBackoff the time to delay the first retry - * @return the HTTP response status as a string + * @param contextMap tracing context variables + * @return the (same) ConsumerDmaapModel */ public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff, Map<String, String> contextMap) { MdcVariables.setMdcContextMap(contextMap); - logger.trace("Method called with arg {}", model); + logger.trace("Publish called with arg {}", model); dmaapProducerReactiveHttpClient = resolveClient(); return Mono.just(model) @@ -117,7 +118,7 @@ public class DataRouterPublisher { logger.trace(response.toString()); return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode())); } catch (Exception e) { - logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e); + logger.warn("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e); return Mono.error(e); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java index fb27a579..8849b45e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -49,31 +49,31 @@ public class FileCollector { this.datafileAppConfig = datafileAppConfig; } - public Mono<ConsumerDmaapModel> execute(FileData fileData, MessageMetaData metaData, long maxNumberOfRetries, - Duration firstBackoffTimeout, Map<String, String> contextMap) { + public Mono<ConsumerDmaapModel> execute(FileData fileData, long maxNumberOfRetries, Duration firstBackoffTimeout, + Map<String, String> contextMap) { MdcVariables.setMdcContextMap(contextMap); logger.trace("Entering execute with {}", fileData); return Mono.just(fileData) // .cache() // - .flatMap(fd -> collectFile(fileData, metaData, contextMap)) // + .flatMap(fd -> collectFile(fileData, contextMap)) // .retryBackoff(maxNumberOfRetries, firstBackoffTimeout); } - private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData, - Map<String, String> contextMap) { + private Mono<ConsumerDmaapModel> collectFile(FileData fileData, Map<String, String> contextMap) { MdcVariables.setMdcContextMap(contextMap); - logger.trace("starting to collectFile"); + logger.trace("starting to collectFile {}", fileData.name()); final String remoteFile = fileData.remoteFilePath(); final Path localFile = fileData.getLocalFileName(); try (FileCollectClient currentClient = createClient(fileData)) { + currentClient.open(); localFile.getParent().toFile().mkdir(); // Create parent directories currentClient.collectFile(remoteFile, localFile); - return Mono.just(getConsumerDmaapModel(fileData, metaData, localFile)); + return Mono.just(getConsumerDmaapModel(fileData, localFile)); } catch (Exception throwable) { - logger.warn("Failed to download file: {}, reason: {}", fileData.name(), throwable); + logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(), throwable.toString()); return Mono.error(throwable); } } @@ -89,9 +89,9 @@ public class FileCollector { } } - private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, MessageMetaData metaData, Path localFile) { + private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, Path localFile) { String location = fileData.location(); - + MessageMetaData metaData = fileData.messageMetaData(); return ImmutableConsumerDmaapModel.builder() // .productName(metaData.productName()) // .vendorName(metaData.vendorName()) // @@ -108,16 +108,13 @@ public class FileCollector { .build(); } - SftpClient createSftpClient(FileData fileData) throws DatafileTaskException { - SftpClient client = new SftpClient(fileData.fileServerData()); - client.open(); - return client; + protected SftpClient createSftpClient(FileData fileData) { + return new SftpClient(fileData.fileServerData()); } - FtpsClient createFtpsClient(FileData fileData) throws DatafileTaskException { + protected FtpsClient createFtpsClient(FileData fileData) { FtpesConfig config = datafileAppConfig.getFtpesConfiguration(); - FtpsClient client = new FtpsClient(fileData.fileServerData()); - client.open(config.keyCert(), config.keyPassword(), Paths.get(config.trustedCA()), config.trustedCAPassword()); - return client; + return new FtpsClient(fileData.fileServerData(), config.keyCert(), config.keyPassword(), + Paths.get(config.trustedCA()), config.trustedCAPassword()); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java index 0729caa0..41f8e3cd 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java @@ -54,7 +54,7 @@ public class PublishedChecker { private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private AppConfig appConfig; + private final AppConfig appConfig; /** * Constructor. @@ -88,12 +88,13 @@ public class PublishedChecker { HttpResponse response = producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest, 2000, contextMap); - logger.trace(response.toString()); + logger.trace("{}", response); int status = response.getStatusLine().getStatusCode(); HttpEntity entity = response.getEntity(); - InputStream content = entity.getContent(); - String body = IOUtils.toString(content); - return HttpStatus.SC_OK == status && !"[]".equals(body); + try (InputStream content = entity.getContent()) { + String body = IOUtils.toString(content); + return HttpStatus.SC_OK == status && !"[]".equals(body); + } } catch (Exception e) { logger.warn("Unable to check if file has been published.", e); return false; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index d41e5c25..b4096c73 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -20,8 +20,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -29,7 +27,6 @@ import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; -import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables; import org.onap.dcaegen2.collectors.datafile.service.PublishedFileCache; import org.slf4j.Logger; @@ -44,32 +41,24 @@ import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; /** - * This implements the main flow of the data file collector. Fetch file ready events from the message router, fetch new - * files from the PNF publish these in the data router. + * This implements the main flow of the data file collector. Fetch file ready events from the + * message router, fetch new files from the PNF publish these in the data router. */ @Component public class ScheduledTasks { - private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200; - private static final int MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS = 10; - - /** Data needed for fetching of one file. */ - private class FileCollectionData { - final FileData fileData; - final MessageMetaData metaData; - - FileCollectionData(FileData fd, MessageMetaData metaData) { - this.fileData = fd; - this.metaData = metaData; - } - } + private static final int NUMBER_OF_WORKER_THREADS = 100; + private static final int MAX_TASKS_FOR_POLLING = 50; + private static final long DATA_ROUTER_MAX_RETRIES = 5; + private static final Duration DATA_ROUTER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(2); + private static final long FILE_TRANSFER_MAX_RETRIES = 3; + private static final Duration FILE_TRANSFER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(5); private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); private final AppConfig applicationConfiguration; private final AtomicInteger currentNumberOfTasks = new AtomicInteger(); - private final Scheduler scheduler = - Schedulers.newElastic("DataFileCollector", MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS); + private final Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS); PublishedFileCache alreadyPublishedFiles = new PublishedFileCache(); /** @@ -86,21 +75,25 @@ public class ScheduledTasks { * Main function for scheduling for the file collection Workflow. */ public void scheduleMainDatafileEventTask(Map<String, String> contextMap) { - MdcVariables.setMdcContextMap(contextMap); - logger.trace("Execution of tasks was registered"); - applicationConfiguration.loadConfigurationFromFile(); - createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap), - () -> onComplete(contextMap)); + try { + MdcVariables.setMdcContextMap(contextMap); + logger.trace("Execution of tasks was registered"); + applicationConfiguration.loadConfigurationFromFile(); + createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap), + () -> onComplete(contextMap)); + } catch (Exception e) { + logger.error("Unexpected exception: ", e); + } } Flux<ConsumerDmaapModel> createMainTask(Map<String, String> contextMap) { return fetchMoreFileReadyMessages() // - .parallel(getParallelism()) // Each FileReadyMessage in a separate thread + .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread .runOn(scheduler) // - .flatMap(this::createFileCollectionTask) // + .flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) // .filter(fileData -> shouldBePublished(fileData, contextMap)) // .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) // - .flatMap(fileData -> collectFileFromXnf(fileData, contextMap)) // + .flatMap(fileData -> fetchFile(fileData, contextMap)) // .flatMap(model -> publishToDataRouter(model, contextMap)) // .doOnNext(model -> deleteFile(model.getInternalLocation(), contextMap)) // .doOnNext(model -> currentNumberOfTasks.decrementAndGet()) // @@ -114,63 +107,61 @@ public class ScheduledTasks { alreadyPublishedFiles.purge(now); } - private void onComplete(Map<String, String> contextMap) { - MdcVariables.setMdcContextMap(contextMap); - logger.info("Datafile tasks have been completed"); + protected PublishedChecker createPublishedChecker() { + return new PublishedChecker(applicationConfiguration); } - private void onSuccess(ConsumerDmaapModel model, Map<String, String> contextMap) { - MdcVariables.setMdcContextMap(contextMap); - logger.info("Datafile consumed tasks {}", model.getInternalLocation()); + protected int getCurrentNumberOfTasks() { + return currentNumberOfTasks.get(); } - private void onError(Throwable throwable, Map<String, String> contextMap) { - MdcVariables.setMdcContextMap(contextMap); - logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable); + protected DMaaPMessageConsumerTask createConsumerTask() { + return new DMaaPMessageConsumerTask(this.applicationConfiguration); } - private int getParallelism() { - if (MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks() > 0) { - return MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks(); - } else { - return 1; // We need at least one rail/thread - } + protected FileCollector createFileCollector() { + return new FileCollector(applicationConfiguration); + } + + protected DataRouterPublisher createDataRouterPublisher() { + return new DataRouterPublisher(applicationConfiguration); } - private Flux<FileCollectionData> createFileCollectionTask(FileReadyMessage availableFiles) { - List<FileCollectionData> fileCollects = new ArrayList<>(); + private void onComplete(Map<String, String> contextMap) { + MdcVariables.setMdcContextMap(contextMap); + logger.trace("Datafile tasks have been completed"); + } - for (FileData fileData : availableFiles.files()) { - fileCollects.add(new FileCollectionData(fileData, availableFiles.messageMetaData())); - } - return Flux.fromIterable(fileCollects); + private synchronized void onSuccess(ConsumerDmaapModel model, Map<String, String> contextMap) { + MdcVariables.setMdcContextMap(contextMap); + logger.info("Datafile file published {}", model.getInternalLocation()); } - private boolean shouldBePublished(FileCollectionData task, Map<String, String> contextMap) { + private void onError(Throwable throwable, Map<String, String> contextMap) { + MdcVariables.setMdcContextMap(contextMap); + logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable.toString()); + } + + private boolean shouldBePublished(FileData fileData, Map<String, String> contextMap) { boolean result = false; - Path localFileName = task.fileData.getLocalFileName(); + Path localFileName = fileData.getLocalFileName(); if (alreadyPublishedFiles.put(localFileName) == null) { result = !createPublishedChecker().execute(localFileName.getFileName().toString(), contextMap); } return result; } - private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect, - Map<String, String> contextMap) { - final long maxNUmberOfRetries = 3; - final Duration initialRetryTimeout = Duration.ofSeconds(5); - + private Mono<ConsumerDmaapModel> fetchFile(FileData fileData, Map<String, String> contextMap) { MdcVariables.setMdcContextMap(contextMap); return createFileCollector() - .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout, - contextMap) - .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, contextMap)); + .execute(fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, contextMap) + .onErrorResume(exception -> handleFetchFileFailure(fileData, contextMap)); } - private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData, Map<String, String> contextMap) { + private Mono<ConsumerDmaapModel> handleFetchFileFailure(FileData fileData, Map<String, String> contextMap) { MdcVariables.setMdcContextMap(contextMap); Path localFileName = fileData.getLocalFileName(); - logger.error("File fetching failed: {}", localFileName); + logger.error("File fetching failed, fileData {}", fileData); deleteFile(localFileName, contextMap); alreadyPublishedFiles.remove(localFileName); currentNumberOfTasks.decrementAndGet(); @@ -178,19 +169,16 @@ public class ScheduledTasks { } private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model, Map<String, String> contextMap) { - final long maxNumberOfRetries = 3; - final Duration initialRetryTimeout = Duration.ofSeconds(5); - - MdcVariables.setMdcContextMap(contextMap); - return createDataRouterPublisher().execute(model, maxNumberOfRetries, initialRetryTimeout, contextMap) - .onErrorResume(exception -> handlePublishFailure(model, exception, contextMap)); + + return createDataRouterPublisher() + .execute(model, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT, contextMap) + .onErrorResume(exception -> handlePublishFailure(model, contextMap)); } - private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception, - Map<String, String> contextMap) { + private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Map<String, String> contextMap) { MdcVariables.setMdcContextMap(contextMap); - logger.error("File publishing failed: {}, exception: {}", model.getName(), exception); + logger.error("File publishing failed: {}", model); Path internalFileName = model.getInternalLocation(); deleteFile(internalFileName, contextMap); alreadyPublishedFiles.remove(internalFileName); @@ -202,8 +190,9 @@ public class ScheduledTasks { * Fetch more messages from the message router. This is done in a polling/blocking fashion. */ private Flux<FileReadyMessage> fetchMoreFileReadyMessages() { - logger.trace("Consuming new file ready messages, current number of tasks: {}", getCurrentNumberOfTasks()); - if (getCurrentNumberOfTasks() > MAX_NUMBER_OF_CONCURRENT_TASKS) { + logger.info("Consuming new file ready messages, current number of tasks: {}", getCurrentNumberOfTasks()); + if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING) { + logger.info("Skipping, current number of tasks: {}", getCurrentNumberOfTasks()); return Flux.empty(); } @@ -215,7 +204,8 @@ public class ScheduledTasks { private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> contextMap) { MdcVariables.setMdcContextMap(contextMap); - logger.error("Polling for file ready message failed, exception: {}", exception); + logger.error("Polling for file ready message failed, exception: {}, config: {}", exception.toString(), + this.applicationConfiguration.getDmaapConsumerConfiguration()); return Flux.empty(); } @@ -229,24 +219,4 @@ public class ScheduledTasks { } } - PublishedChecker createPublishedChecker() { - return new PublishedChecker(applicationConfiguration); - } - - int getCurrentNumberOfTasks() { - return currentNumberOfTasks.get(); - } - - DMaaPMessageConsumerTask createConsumerTask() { - return new DMaaPMessageConsumerTask(this.applicationConfiguration); - } - - FileCollector createFileCollector() { - return new FileCollector(applicationConfiguration); - } - - DataRouterPublisher createDataRouterPublisher() { - return new DataRouterPublisher(applicationConfiguration); - } - } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java index 1f5827c8..84c5e07b 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java @@ -44,6 +44,20 @@ public class FileDataTest { private static final String LOCATION_WITHOUT_USER = FTPES_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION; + + private MessageMetaData messageMetaData() { + return ImmutableMessageMetaData.builder() + .productName("PRODUCT_NAME") + .vendorName("VENDOR_NAME") + .lastEpochMicrosec("LAST_EPOCH_MICROSEC") + .sourceName("SOURCE_NAME") + .startEpochMicrosec("START_EPOCH_MICROSEC") + .timeZoneOffset("TIME_ZONE_OFFSET") + .changeIdentifier("PM_MEAS_CHANGE_IDENTIFIER") + .changeType("FILE_READY_CHANGE_TYPE") + .build(); + } + private FileData properFileDataWithUser() { // @formatter:off return ImmutableFileData.builder() @@ -53,6 +67,7 @@ public class FileDataTest { .fileFormatType("type") .fileFormatVersion("version") .scheme(Scheme.FTPS) + .messageMetaData(messageMetaData()) .build(); // @formatter:on } @@ -66,6 +81,7 @@ public class FileDataTest { .fileFormatType("type") .fileFormatVersion("version") .scheme(Scheme.FTPS) + .messageMetaData(messageMetaData()) .build(); // @formatter:on } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java index b33180fa..b8aa7da2 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java @@ -29,7 +29,6 @@ import java.util.Optional; import org.junit.jupiter.api.Test; import org.mockito.Mockito; -import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; @@ -67,7 +66,7 @@ class JsonMessageParserTest { private static final String NOTIFICATION_FIELDS_VERSION = "1.0"; @Test - void whenPassingCorrectJson_oneFileReadyMessage() throws DmaapNotFoundException { + void whenPassingCorrectJson_oneFileReadyMessage() { AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() // .name(PM_FILE_NAME) // .location(LOCATION) // @@ -100,12 +99,11 @@ class JsonMessageParserTest { .compression(GZIP_COMPRESSION) // .fileFormatType(FILE_FORMAT_TYPE) // .fileFormatVersion(FILE_FORMAT_VERSION) // + .messageMetaData(messageMetaData) .build(); List<FileData> files = new ArrayList<>(); files.add(expectedFileData); FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() - .pnfName(SOURCE_NAME) // - .messageMetaData(messageMetaData) // .files(files) // .build(); @@ -121,7 +119,7 @@ class JsonMessageParserTest { } @Test - void whenPassingCorrectJsonWithTwoEvents_twoMessages() throws DmaapNotFoundException { + void whenPassingCorrectJsonWithTwoEvents_twoMessages() { AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() // .name(PM_FILE_NAME) // .location(LOCATION) // @@ -154,12 +152,11 @@ class JsonMessageParserTest { .compression(GZIP_COMPRESSION) // .fileFormatType(FILE_FORMAT_TYPE) // .fileFormatVersion(FILE_FORMAT_VERSION) // + .messageMetaData(messageMetaData) .build(); List<FileData> files = new ArrayList<>(); files.add(expectedFileData); FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() // - .pnfName(SOURCE_NAME) // - .messageMetaData(messageMetaData) // .files(files) // .build(); @@ -202,7 +199,7 @@ class JsonMessageParserTest { } @Test - void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() throws DmaapNotFoundException { + void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() { AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() // .name(PM_FILE_NAME) // .location(LOCATION) // @@ -235,12 +232,11 @@ class JsonMessageParserTest { .compression(GZIP_COMPRESSION) // .fileFormatType(FILE_FORMAT_TYPE) // .fileFormatVersion(FILE_FORMAT_VERSION) // + .messageMetaData(messageMetaData) .build(); List<FileData> files = new ArrayList<>(); files.add(expectedFileData); FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() // - .pnfName(SOURCE_NAME) // - .messageMetaData(messageMetaData) // .files(files) // .build(); @@ -421,12 +417,11 @@ class JsonMessageParserTest { .compression(GZIP_COMPRESSION) // .fileFormatType(FILE_FORMAT_TYPE) // .fileFormatVersion(FILE_FORMAT_VERSION) // + .messageMetaData(messageMetaData) .build(); List<FileData> files = new ArrayList<>(); files.add(expectedFileData); FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() // - .pnfName(SOURCE_NAME) // - .messageMetaData(messageMetaData) // .files(files) // .build(); diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java index a695e20d..98c7dc32 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java @@ -20,7 +20,6 @@ package org.onap.dcaegen2.collectors.datafile.tasks; -import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -156,13 +155,12 @@ public class DMaaPMessageConsumerTaskImplTest { .compression(GZIP_COMPRESSION) // .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) // .fileFormatVersion(FILE_FORMAT_VERSION) // + .messageMetaData(messageMetaData) .build(); List<FileData> files = new ArrayList<>(); files.add(ftpesFileData); expectedFtpesMessage = ImmutableFileReadyMessage.builder() // - .pnfName(SOURCE_NAME) // - .messageMetaData(messageMetaData) // .files(files) // .build(); @@ -187,6 +185,7 @@ public class DMaaPMessageConsumerTaskImplTest { .compression(GZIP_COMPRESSION) // .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) // .fileFormatVersion(FILE_FORMAT_VERSION) // + .messageMetaData(messageMetaData) .build(); ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder() // @@ -208,8 +207,6 @@ public class DMaaPMessageConsumerTaskImplTest { files = new ArrayList<>(); files.add(sftpFileData); expectedSftpMessage = ImmutableFileReadyMessage.builder() // - .pnfName(SOURCE_NAME) // - .messageMetaData(messageMetaData) // .files(files) // .build(); } @@ -264,8 +261,6 @@ public class DMaaPMessageConsumerTaskImplTest { .thenReturn(Flux.error(new DatafileTaskException("problemas"))); } - messageConsumerTask = spy(new DMaaPMessageConsumerTask(appConfig, httpClientMock, jsonMessageParserMock)); - when(messageConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration); - doReturn(httpClientMock).when(messageConsumerTask).resolveClient(); + messageConsumerTask = spy(new DMaaPMessageConsumerTask(httpClientMock, jsonMessageParserMock)); } } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java index ed8b93f1..fe867738 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java @@ -86,13 +86,11 @@ class DataRouterPublisherTest { private static final String FEED_ID = "1"; private static final String FILE_CONTENT = "Just a string."; - private static final Map<String, String> CONTEXT_MAP = new HashMap<>(); - private static ConsumerDmaapModel consumerDmaapModel; private static DmaapProducerReactiveHttpClient httpClientMock; private static AppConfig appConfig; private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class); - + private final Map<String, String> contextMap = new HashMap<>(); private static DataRouterPublisher publisherTaskUnderTestSpy; /** @@ -125,9 +123,8 @@ class DataRouterPublisherTest { @Test public void whenPassedObjectFits_ReturnsCorrectStatus() throws Exception { prepareMocksForTests(null, Integer.valueOf(HttpStatus.OK.value())); - StepVerifier - .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), CONTEXT_MAP)) + .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap)) .expectNext(consumerDmaapModel) // .verifyComplete(); @@ -170,7 +167,7 @@ class DataRouterPublisherTest { prepareMocksForTests(new DatafileTaskException("Error"), HttpStatus.OK.value()); StepVerifier - .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 2, Duration.ofSeconds(0), CONTEXT_MAP)) + .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 2, Duration.ofSeconds(0), contextMap)) .expectNext(consumerDmaapModel) // .verifyComplete(); } @@ -181,7 +178,7 @@ class DataRouterPublisherTest { Integer.valueOf(HttpStatus.OK.value())); StepVerifier - .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), CONTEXT_MAP)) + .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap)) .expectNext(consumerDmaapModel) // .verifyComplete(); @@ -197,7 +194,7 @@ class DataRouterPublisherTest { Integer.valueOf((HttpStatus.BAD_GATEWAY.value()))); StepVerifier - .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), CONTEXT_MAP)) + .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap)) .expectErrorMessage("Retries exhausted: 1/1") // .verify(); diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java index fb49c860..6e17f27b 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java @@ -67,7 +67,7 @@ public class FileCollectorTest { private static final int PORT_22 = 22; private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME; - private static final Path LOCAL_FILE_LOCATION = FileData.createLocalFileName(SERVER_ADDRESS, PM_FILE_NAME); + private static final Path LOCAL_FILE_LOCATION = FileData.createLocalFileName(SOURCE_NAME, PM_FILE_NAME); private static final String USER = "usr"; private static final String PWD = "pwd"; private static final String FTPES_LOCATION = @@ -93,6 +93,7 @@ public class FileCollectorTest { private FtpsClient ftpsClientMock = mock(FtpsClient.class); private SftpClient sftpClientMock = mock(SftpClient.class); + private final Map<String, String> contextMap = new HashMap<>(); private MessageMetaData createMessageMetaData() { @@ -119,6 +120,7 @@ public class FileCollectorTest { .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) .scheme(scheme) + .messageMetaData(createMessageMetaData()) .build(); // @formatter:on } @@ -160,11 +162,11 @@ public class FileCollectorTest { ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT); - Map<String, String> contextMap = new HashMap<>(); StepVerifier.create( - collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap)) + collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0), contextMap)) .expectNext(expectedConsumerDmaapModel).verifyComplete(); + verify(ftpsClientMock, times(1)).open(); verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); verify(ftpsClientMock, times(1)).close(); @@ -176,12 +178,12 @@ public class FileCollectorTest { FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock)); doReturn(sftpClientMock).when(collectorUndetTest).createSftpClient(any()); + FileData fileData = createFileData(SFTP_LOCATION_NO_PORT, Scheme.SFTP); ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(SFTP_LOCATION_NO_PORT); - Map<String, String> contextMap = new HashMap<>(); StepVerifier - .create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), + .create(collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0), contextMap)) .expectNext(expectedConsumerDmaapModel) // .verifyComplete(); @@ -191,11 +193,12 @@ public class FileCollectorTest { expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(SFTP_LOCATION); StepVerifier - .create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), + .create(collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0), contextMap)) .expectNext(expectedConsumerDmaapModel) // .verifyComplete(); + verify(sftpClientMock, times(2)).open(); verify(sftpClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); verify(sftpClientMock, times(2)).close(); verifyNoMoreInteractions(sftpClientMock); @@ -210,9 +213,8 @@ public class FileCollectorTest { doThrow(new DatafileTaskException("Unable to collect file.")).when(ftpsClientMock) .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); - Map<String, String> contextMap = new HashMap<>(); StepVerifier.create( - collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap)) + collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0), contextMap)) .expectErrorMessage("Retries exhausted: 3/3").verify(); verify(ftpsClientMock, times(4)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); @@ -228,9 +230,9 @@ public class FileCollectorTest { ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT); FileData fileData = createFileData(FTPES_LOCATION_NO_PORT, Scheme.FTPS); - Map<String, String> contextMap = new HashMap<>(); + StepVerifier.create( - collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap)) + collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0), contextMap)) .expectNext(expectedConsumerDmaapModel).verifyComplete(); verify(ftpsClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java index d781cea3..f6beae02 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java @@ -30,8 +30,10 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import java.nio.file.Paths; import java.time.Duration; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -64,6 +66,7 @@ public class ScheduledTasksTest { private PublishedChecker publishedCheckerMock; private FileCollector fileCollectorMock; private DataRouterPublisher dataRouterMock; + private Map<String, String> contextMap = new HashMap<String, String>(); @BeforeEach private void setUp() { @@ -115,6 +118,7 @@ public class ScheduledTasksTest { .location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) // .scheme(Scheme.FTPS) // .compression("") // + .messageMetaData(messageMetaData()) .build(); } @@ -130,9 +134,7 @@ public class ScheduledTasksTest { } private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) { - MessageMetaData md = messageMetaData(); - return ImmutableFileReadyMessage.builder().pnfName(md.sourceName()).messageMetaData(md) - .files(files(numberOfFiles, uniqueNames)).build(); + return ImmutableFileReadyMessage.builder().files(files(numberOfFiles, uniqueNames)).build(); } private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) { @@ -185,17 +187,17 @@ public class ScheduledTasksTest { doReturn(false).when(publishedCheckerMock).execute(anyString(), any()); Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData()); - doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any()); + doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), anyLong(), notNull(), notNull()); doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any()); - StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() // + StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() // .expectNextCount(noOfFiles) // .expectComplete() // .verify(); // assertEquals(0, testedObject.getCurrentNumberOfTasks()); verify(consumerMock, times(1)).execute(); - verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), notNull(), anyLong(), notNull(), any()); + verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull(), notNull()); verify(dataRouterMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull(), any()); verifyNoMoreInteractions(dataRouterMock); verifyNoMoreInteractions(fileCollectorMock); @@ -215,19 +217,19 @@ public class ScheduledTasksTest { // First file collect will fail, 3 will succeed doReturn(error, collectedFile, collectedFile, collectedFile) // .when(fileCollectorMock) // - .execute(any(FileData.class), any(MessageMetaData.class), anyLong(), any(Duration.class), any()); + .execute(any(FileData.class), anyLong(), any(Duration.class), notNull()); doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any()); doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any()); - StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() // + StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() // .expectNextCount(3) // .expectComplete() // .verify(); // assertEquals(0, testedObject.getCurrentNumberOfTasks()); verify(consumerMock, times(1)).execute(); - verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull(), any()); + verify(fileCollectorMock, times(4)).execute(notNull(), anyLong(), notNull(), notNull()); verify(dataRouterMock, times(3)).execute(notNull(), anyLong(), notNull(), any()); verifyNoMoreInteractions(dataRouterMock); verifyNoMoreInteractions(fileCollectorMock); @@ -243,7 +245,7 @@ public class ScheduledTasksTest { doReturn(false).when(publishedCheckerMock).execute(anyString(), any()); Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData()); - doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any()); + doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), anyLong(), notNull(), notNull()); Mono<Object> error = Mono.error(new Exception("problem")); // One publish will fail, the rest will succeed @@ -251,14 +253,14 @@ public class ScheduledTasksTest { .when(dataRouterMock) // .execute(notNull(), anyLong(), notNull(), any()); - StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() // + StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() // .expectNextCount(3) // 3 completed files .expectComplete() // .verify(); // assertEquals(0, testedObject.getCurrentNumberOfTasks()); verify(consumerMock, times(1)).execute(); - verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull(), any()); + verify(fileCollectorMock, times(4)).execute(notNull(), anyLong(), notNull(), notNull()); verify(dataRouterMock, times(4)).execute(notNull(), anyLong(), notNull(), any()); verifyNoMoreInteractions(dataRouterMock); verifyNoMoreInteractions(fileCollectorMock); @@ -277,17 +279,17 @@ public class ScheduledTasksTest { doReturn(false).when(publishedCheckerMock).execute(anyString(), any()); Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData()); - doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any()); + doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), anyLong(), notNull(), notNull()); doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any()); - StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() // + StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() // .expectNextCount(1) // 99 is skipped .expectComplete() // .verify(); // assertEquals(0, testedObject.getCurrentNumberOfTasks()); verify(consumerMock, times(1)).execute(); - verify(fileCollectorMock, times(1)).execute(notNull(), notNull(), anyLong(), notNull(), any()); + verify(fileCollectorMock, times(1)).execute(notNull(), anyLong(), notNull(), notNull()); verify(dataRouterMock, times(1)).execute(notNull(), anyLong(), notNull(), any()); verifyNoMoreInteractions(dataRouterMock); verifyNoMoreInteractions(fileCollectorMock); |