summaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java6
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java5
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java31
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java27
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java36
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java28
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java7
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java33
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java11
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java156
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java16
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java19
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java11
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java13
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java22
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java32
17 files changed, 204 insertions, 253 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index f89a1012..a30d2826 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -112,10 +112,8 @@ public class AppConfig {
setConfiguration(consumerConfiguration, publisherConfiguration, ftpesConfig);
}
- } catch (IOException e) {
- logger.error("Problem with file loading, file: {}", filepath, e);
- } catch (JsonSyntaxException e) {
- logger.error("Problem with Json deserialization", e);
+ } catch (JsonSyntaxException | IOException e) {
+ logger.error("Problem with loading configuration, file: {}", filepath, e);
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
index 52723330..d5c8b3b2 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
@@ -63,7 +63,7 @@ public class SchedulerConfig {
private static final Logger logger = LoggerFactory.getLogger(SchedulerConfig.class);
private static final Marker ENTRY = MarkerFactory.getMarker("ENTRY");
private static final Marker EXIT = MarkerFactory.getMarker("EXIT");
- private static volatile List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
+ private static List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
private Map<String, String> contextMap;
private final TaskScheduler taskScheduler;
@@ -97,8 +97,7 @@ public class SchedulerConfig {
MdcVariables.setMdcContextMap(contextMap);
logger.info(EXIT, "Stopped Datafile workflow");
MDC.clear();
- return Mono.defer(() -> Mono
- .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED)));
+ return Mono.just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED));
}
/**
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java
deleted file mode 100644
index 36279016..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.exceptions;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
- */
-public class DmaapNotFoundException extends DatafileTaskException {
-
- private static final long serialVersionUID = 1L;
-
- public DmaapNotFoundException(String message) {
- super(message);
- }
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
index bdb47b2b..d0d1f91a 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
@@ -37,10 +37,19 @@ import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
public abstract class FileData {
private static final String DATAFILE_TMPDIR = "/tmp/onap_datafile/";
+ /**
+ * @return the file name with no path
+ */
public abstract String name();
+ /**
+ * @return the URL to use to fetch the file from the PNF
+ */
public abstract String location();
+ /**
+ * @return the file transfer protocol to use for fetching the file
+ */
public abstract Scheme scheme();
public abstract String compression();
@@ -49,17 +58,25 @@ public abstract class FileData {
public abstract String fileFormatVersion();
+ public abstract MessageMetaData messageMetaData();
+
+ /**
+ * @return the name of the PNF, must be unique in the network
+ */
+ public String sourceName() {
+ return messageMetaData().sourceName();
+ }
+
public String remoteFilePath() {
return URI.create(location()).getPath();
}
public Path getLocalFileName() {
- URI uri = URI.create(location());
- return createLocalFileName(uri.getHost(), name());
+ return createLocalFileName(messageMetaData().sourceName(), name());
}
- public static Path createLocalFileName(String host, String fileName) {
- return Paths.get(DATAFILE_TMPDIR, host + "_" + fileName);
+ public static Path createLocalFileName(String sourceName, String fileName) {
+ return Paths.get(DATAFILE_TMPDIR, sourceName + "_" + fileName);
}
public FileServerData fileServerData() {
@@ -86,4 +103,4 @@ public abstract class FileData {
}
return Optional.empty();
}
-} \ No newline at end of file
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
index e3293faa..9373a4f2 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
@@ -31,9 +31,5 @@ import org.immutables.value.Value;
@Value.Immutable
@Gson.TypeAdapters
public interface FileReadyMessage {
- public String pnfName();
-
- public MessageMetaData messageMetaData();
-
public List<FileData> files();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index af45cc99..a3595ecf 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -100,9 +100,13 @@ public class JsonMessageParser {
Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
JsonParser jsonParser = new JsonParser();
- return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
- : element.isJsonObject() ? Optional.of((JsonObject) element)
- : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
+ if (element.isJsonPrimitive()) {
+ return Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject());
+ } else if (element.isJsonObject()) {
+ return Optional.of((JsonObject) element);
+ } else {
+ return Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
+ }
}
private Flux<FileReadyMessage> getMessagesFromJsonArray(JsonElement jsonElement) {
@@ -133,18 +137,17 @@ public class JsonMessageParser {
: logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject));
}
+
private Mono<FileReadyMessage> transformMessages(JsonObject message) {
Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message);
if (optionalMessageMetaData.isPresent()) {
+ MessageMetaData messageMetaData = optionalMessageMetaData.get();
JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
if (arrayOfNamedHashMap != null) {
- List<FileData> allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap);
+ List<FileData> allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap, messageMetaData);
if (!allFileDataFromJson.isEmpty()) {
- MessageMetaData messageMetaData = optionalMessageMetaData.get();
return Mono.just(ImmutableFileReadyMessage.builder() //
- .pnfName(messageMetaData.sourceName()) //
- .messageMetaData(messageMetaData) //
.files(allFileDataFromJson) //
.build());
} else {
@@ -152,13 +155,14 @@ public class JsonMessageParser {
}
}
- logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message);
+ logger.error("VES event parsing. Missing arrayOfNamedHashMap in message. {}", message);
return Mono.empty();
}
- logger.error("Unable to collect file from xNF. FileReady event has incorrect JsonObject. {}", message);
+ logger.error("VES event parsing. FileReady event has incorrect JsonObject. {}", message);
return Mono.empty();
}
+
private Optional<MessageMetaData> getMessageMetaData(JsonObject message) {
List<String> missingValues = new ArrayList<>();
JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER);
@@ -185,7 +189,7 @@ public class JsonMessageParser {
if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) {
return Optional.of(messageMetaData);
} else {
- String errorMessage = "Unable to collect file from xNF.";
+ String errorMessage = "VES event parsing.";
if (!missingValues.isEmpty()) {
errorMessage += " Missing data: " + missingValues;
}
@@ -206,11 +210,11 @@ public class JsonMessageParser {
return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier);
}
- private List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields) {
+ private List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields, MessageMetaData messageMetaData) {
List<FileData> res = new ArrayList<>();
for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
- Optional<FileData> fileData = getFileDataFromJson(fileInfo);
+ Optional<FileData> fileData = getFileDataFromJson(fileInfo, messageMetaData);
if (fileData.isPresent()) {
res.add(fileData.get());
@@ -219,7 +223,7 @@ public class JsonMessageParser {
return res;
}
- private Optional<FileData> getFileDataFromJson(JsonObject fileInfo) {
+ private Optional<FileData> getFileDataFromJson(JsonObject fileInfo, MessageMetaData messageMetaData) {
logger.trace("starting to getFileDataFromJson!");
List<String> missingValues = new ArrayList<>();
@@ -230,7 +234,7 @@ public class JsonMessageParser {
try {
scheme = Scheme.getSchemeFromString(URI.create(location).getScheme());
} catch (Exception e) {
- logger.error("Unable to collect file from xNF.", e);
+ logger.error("VES event parsing.", e);
return Optional.empty();
}
FileData fileData = ImmutableFileData.builder() //
@@ -240,12 +244,12 @@ public class JsonMessageParser {
.location(location) //
.scheme(scheme) //
.compression(getValueFromJson(data, COMPRESSION, missingValues)) //
+ .messageMetaData(messageMetaData)
.build();
if (missingValues.isEmpty()) {
return Optional.of(fileData);
}
- logger.error("Unable to collect file from xNF. File information wrong. Missing data: {} Data: {}",
- missingValues, fileInfo);
+ logger.error("VES event parsing. File information wrong. Missing data: {} Data: {}", missingValues, fileInfo);
return Optional.empty();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java
index f6daf733..49e2f01e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java
@@ -30,8 +30,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consume
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.reactive.function.client.WebClient;
-import reactor.core.publisher.Mono;
+
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
@@ -39,25 +40,21 @@ import reactor.core.publisher.Flux;
public class DMaaPMessageConsumerTask {
private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumerTask.class);
- private AppConfig datafileAppConfig;
- private JsonMessageParser jsonMessageParser;
- private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
+ private final JsonMessageParser jsonMessageParser;
+ private final DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
public DMaaPMessageConsumerTask(AppConfig datafileAppConfig) {
- this.datafileAppConfig = datafileAppConfig;
this.jsonMessageParser = new JsonMessageParser();
+ this.dmaaPConsumerReactiveHttpClient = createHttpClient(datafileAppConfig);
}
- protected DMaaPMessageConsumerTask(AppConfig datafileAppConfig,
- DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
+ protected DMaaPMessageConsumerTask(DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
JsonMessageParser messageParser) {
- this.datafileAppConfig = datafileAppConfig;
this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient;
this.jsonMessageParser = messageParser;
}
public Flux<FileReadyMessage> execute() {
- dmaaPConsumerReactiveHttpClient = resolveClient();
logger.trace("execute called");
return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()));
}
@@ -67,15 +64,10 @@ public class DMaaPMessageConsumerTask {
return jsonMessageParser.getMessagesFromJson(message);
}
- protected DmaapConsumerConfiguration resolveConfiguration() {
- return datafileAppConfig.getDmaapConsumerConfiguration();
+ private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig) {
+ DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration();
+ WebClient client = new DmaapReactiveWebClient().fromConfiguration(config).build();
+ return new DMaaPConsumerReactiveHttpClient(config, client);
}
- protected DMaaPConsumerReactiveHttpClient resolveClient() {
- return new DMaaPConsumerReactiveHttpClient(resolveConfiguration(), buildWebClient());
- }
-
- protected WebClient buildWebClient() {
- return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
- }
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index 0fef9ab4..4207d1fc 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -84,12 +84,13 @@ public class DataRouterPublisher {
* @param model information about the file to publish
* @param numRetries the maximal number of retries if the publishing fails
* @param firstBackoff the time to delay the first retry
- * @return the HTTP response status as a string
+ * @param contextMap tracing context variables
+ * @return the (same) ConsumerDmaapModel
*/
public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff,
Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
- logger.trace("Method called with arg {}", model);
+ logger.trace("Publish called with arg {}", model);
dmaapProducerReactiveHttpClient = resolveClient();
return Mono.just(model)
@@ -117,7 +118,7 @@ public class DataRouterPublisher {
logger.trace(response.toString());
return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
} catch (Exception e) {
- logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e);
+ logger.warn("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e);
return Mono.error(e);
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index fb27a579..8849b45e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -49,31 +49,31 @@ public class FileCollector {
this.datafileAppConfig = datafileAppConfig;
}
- public Mono<ConsumerDmaapModel> execute(FileData fileData, MessageMetaData metaData, long maxNumberOfRetries,
- Duration firstBackoffTimeout, Map<String, String> contextMap) {
+ public Mono<ConsumerDmaapModel> execute(FileData fileData, long maxNumberOfRetries, Duration firstBackoffTimeout,
+ Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
logger.trace("Entering execute with {}", fileData);
return Mono.just(fileData) //
.cache() //
- .flatMap(fd -> collectFile(fileData, metaData, contextMap)) //
+ .flatMap(fd -> collectFile(fileData, contextMap)) //
.retryBackoff(maxNumberOfRetries, firstBackoffTimeout);
}
- private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData,
- Map<String, String> contextMap) {
+ private Mono<ConsumerDmaapModel> collectFile(FileData fileData, Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
- logger.trace("starting to collectFile");
+ logger.trace("starting to collectFile {}", fileData.name());
final String remoteFile = fileData.remoteFilePath();
final Path localFile = fileData.getLocalFileName();
try (FileCollectClient currentClient = createClient(fileData)) {
+ currentClient.open();
localFile.getParent().toFile().mkdir(); // Create parent directories
currentClient.collectFile(remoteFile, localFile);
- return Mono.just(getConsumerDmaapModel(fileData, metaData, localFile));
+ return Mono.just(getConsumerDmaapModel(fileData, localFile));
} catch (Exception throwable) {
- logger.warn("Failed to download file: {}, reason: {}", fileData.name(), throwable);
+ logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(), throwable.toString());
return Mono.error(throwable);
}
}
@@ -89,9 +89,9 @@ public class FileCollector {
}
}
- private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, MessageMetaData metaData, Path localFile) {
+ private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, Path localFile) {
String location = fileData.location();
-
+ MessageMetaData metaData = fileData.messageMetaData();
return ImmutableConsumerDmaapModel.builder() //
.productName(metaData.productName()) //
.vendorName(metaData.vendorName()) //
@@ -108,16 +108,13 @@ public class FileCollector {
.build();
}
- SftpClient createSftpClient(FileData fileData) throws DatafileTaskException {
- SftpClient client = new SftpClient(fileData.fileServerData());
- client.open();
- return client;
+ protected SftpClient createSftpClient(FileData fileData) {
+ return new SftpClient(fileData.fileServerData());
}
- FtpsClient createFtpsClient(FileData fileData) throws DatafileTaskException {
+ protected FtpsClient createFtpsClient(FileData fileData) {
FtpesConfig config = datafileAppConfig.getFtpesConfiguration();
- FtpsClient client = new FtpsClient(fileData.fileServerData());
- client.open(config.keyCert(), config.keyPassword(), Paths.get(config.trustedCA()), config.trustedCAPassword());
- return client;
+ return new FtpsClient(fileData.fileServerData(), config.keyCert(), config.keyPassword(),
+ Paths.get(config.trustedCA()), config.trustedCAPassword());
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
index 0729caa0..41f8e3cd 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
@@ -54,7 +54,7 @@ public class PublishedChecker {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
- private AppConfig appConfig;
+ private final AppConfig appConfig;
/**
* Constructor.
@@ -88,12 +88,13 @@ public class PublishedChecker {
HttpResponse response =
producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest, 2000, contextMap);
- logger.trace(response.toString());
+ logger.trace("{}", response);
int status = response.getStatusLine().getStatusCode();
HttpEntity entity = response.getEntity();
- InputStream content = entity.getContent();
- String body = IOUtils.toString(content);
- return HttpStatus.SC_OK == status && !"[]".equals(body);
+ try (InputStream content = entity.getContent()) {
+ String body = IOUtils.toString(content);
+ return HttpStatus.SC_OK == status && !"[]".equals(body);
+ }
} catch (Exception e) {
logger.warn("Unable to check if file has been published.", e);
return false;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index d41e5c25..b4096c73 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -20,8 +20,6 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@@ -29,7 +27,6 @@ import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
-import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.collectors.datafile.service.PublishedFileCache;
import org.slf4j.Logger;
@@ -44,32 +41,24 @@ import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
/**
- * This implements the main flow of the data file collector. Fetch file ready events from the message router, fetch new
- * files from the PNF publish these in the data router.
+ * This implements the main flow of the data file collector. Fetch file ready events from the
+ * message router, fetch new files from the PNF publish these in the data router.
*/
@Component
public class ScheduledTasks {
- private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200;
- private static final int MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS = 10;
-
- /** Data needed for fetching of one file. */
- private class FileCollectionData {
- final FileData fileData;
- final MessageMetaData metaData;
-
- FileCollectionData(FileData fd, MessageMetaData metaData) {
- this.fileData = fd;
- this.metaData = metaData;
- }
- }
+ private static final int NUMBER_OF_WORKER_THREADS = 100;
+ private static final int MAX_TASKS_FOR_POLLING = 50;
+ private static final long DATA_ROUTER_MAX_RETRIES = 5;
+ private static final Duration DATA_ROUTER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(2);
+ private static final long FILE_TRANSFER_MAX_RETRIES = 3;
+ private static final Duration FILE_TRANSFER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(5);
private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
private final AppConfig applicationConfiguration;
private final AtomicInteger currentNumberOfTasks = new AtomicInteger();
- private final Scheduler scheduler =
- Schedulers.newElastic("DataFileCollector", MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS);
+ private final Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS);
PublishedFileCache alreadyPublishedFiles = new PublishedFileCache();
/**
@@ -86,21 +75,25 @@ public class ScheduledTasks {
* Main function for scheduling for the file collection Workflow.
*/
public void scheduleMainDatafileEventTask(Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
- logger.trace("Execution of tasks was registered");
- applicationConfiguration.loadConfigurationFromFile();
- createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap),
- () -> onComplete(contextMap));
+ try {
+ MdcVariables.setMdcContextMap(contextMap);
+ logger.trace("Execution of tasks was registered");
+ applicationConfiguration.loadConfigurationFromFile();
+ createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap),
+ () -> onComplete(contextMap));
+ } catch (Exception e) {
+ logger.error("Unexpected exception: ", e);
+ }
}
Flux<ConsumerDmaapModel> createMainTask(Map<String, String> contextMap) {
return fetchMoreFileReadyMessages() //
- .parallel(getParallelism()) // Each FileReadyMessage in a separate thread
+ .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread
.runOn(scheduler) //
- .flatMap(this::createFileCollectionTask) //
+ .flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) //
.filter(fileData -> shouldBePublished(fileData, contextMap)) //
.doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
- .flatMap(fileData -> collectFileFromXnf(fileData, contextMap)) //
+ .flatMap(fileData -> fetchFile(fileData, contextMap)) //
.flatMap(model -> publishToDataRouter(model, contextMap)) //
.doOnNext(model -> deleteFile(model.getInternalLocation(), contextMap)) //
.doOnNext(model -> currentNumberOfTasks.decrementAndGet()) //
@@ -114,63 +107,61 @@ public class ScheduledTasks {
alreadyPublishedFiles.purge(now);
}
- private void onComplete(Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
- logger.info("Datafile tasks have been completed");
+ protected PublishedChecker createPublishedChecker() {
+ return new PublishedChecker(applicationConfiguration);
}
- private void onSuccess(ConsumerDmaapModel model, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
- logger.info("Datafile consumed tasks {}", model.getInternalLocation());
+ protected int getCurrentNumberOfTasks() {
+ return currentNumberOfTasks.get();
}
- private void onError(Throwable throwable, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
- logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable);
+ protected DMaaPMessageConsumerTask createConsumerTask() {
+ return new DMaaPMessageConsumerTask(this.applicationConfiguration);
}
- private int getParallelism() {
- if (MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks() > 0) {
- return MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks();
- } else {
- return 1; // We need at least one rail/thread
- }
+ protected FileCollector createFileCollector() {
+ return new FileCollector(applicationConfiguration);
+ }
+
+ protected DataRouterPublisher createDataRouterPublisher() {
+ return new DataRouterPublisher(applicationConfiguration);
}
- private Flux<FileCollectionData> createFileCollectionTask(FileReadyMessage availableFiles) {
- List<FileCollectionData> fileCollects = new ArrayList<>();
+ private void onComplete(Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
+ logger.trace("Datafile tasks have been completed");
+ }
- for (FileData fileData : availableFiles.files()) {
- fileCollects.add(new FileCollectionData(fileData, availableFiles.messageMetaData()));
- }
- return Flux.fromIterable(fileCollects);
+ private synchronized void onSuccess(ConsumerDmaapModel model, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
+ logger.info("Datafile file published {}", model.getInternalLocation());
}
- private boolean shouldBePublished(FileCollectionData task, Map<String, String> contextMap) {
+ private void onError(Throwable throwable, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
+ logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable.toString());
+ }
+
+ private boolean shouldBePublished(FileData fileData, Map<String, String> contextMap) {
boolean result = false;
- Path localFileName = task.fileData.getLocalFileName();
+ Path localFileName = fileData.getLocalFileName();
if (alreadyPublishedFiles.put(localFileName) == null) {
result = !createPublishedChecker().execute(localFileName.getFileName().toString(), contextMap);
}
return result;
}
- private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect,
- Map<String, String> contextMap) {
- final long maxNUmberOfRetries = 3;
- final Duration initialRetryTimeout = Duration.ofSeconds(5);
-
+ private Mono<ConsumerDmaapModel> fetchFile(FileData fileData, Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
return createFileCollector()
- .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout,
- contextMap)
- .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, contextMap));
+ .execute(fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, contextMap)
+ .onErrorResume(exception -> handleFetchFileFailure(fileData, contextMap));
}
- private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData, Map<String, String> contextMap) {
+ private Mono<ConsumerDmaapModel> handleFetchFileFailure(FileData fileData, Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
Path localFileName = fileData.getLocalFileName();
- logger.error("File fetching failed: {}", localFileName);
+ logger.error("File fetching failed, fileData {}", fileData);
deleteFile(localFileName, contextMap);
alreadyPublishedFiles.remove(localFileName);
currentNumberOfTasks.decrementAndGet();
@@ -178,19 +169,16 @@ public class ScheduledTasks {
}
private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model, Map<String, String> contextMap) {
- final long maxNumberOfRetries = 3;
- final Duration initialRetryTimeout = Duration.ofSeconds(5);
-
-
MdcVariables.setMdcContextMap(contextMap);
- return createDataRouterPublisher().execute(model, maxNumberOfRetries, initialRetryTimeout, contextMap)
- .onErrorResume(exception -> handlePublishFailure(model, exception, contextMap));
+
+ return createDataRouterPublisher()
+ .execute(model, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT, contextMap)
+ .onErrorResume(exception -> handlePublishFailure(model, contextMap));
}
- private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception,
- Map<String, String> contextMap) {
+ private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
- logger.error("File publishing failed: {}, exception: {}", model.getName(), exception);
+ logger.error("File publishing failed: {}", model);
Path internalFileName = model.getInternalLocation();
deleteFile(internalFileName, contextMap);
alreadyPublishedFiles.remove(internalFileName);
@@ -202,8 +190,9 @@ public class ScheduledTasks {
* Fetch more messages from the message router. This is done in a polling/blocking fashion.
*/
private Flux<FileReadyMessage> fetchMoreFileReadyMessages() {
- logger.trace("Consuming new file ready messages, current number of tasks: {}", getCurrentNumberOfTasks());
- if (getCurrentNumberOfTasks() > MAX_NUMBER_OF_CONCURRENT_TASKS) {
+ logger.info("Consuming new file ready messages, current number of tasks: {}", getCurrentNumberOfTasks());
+ if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING) {
+ logger.info("Skipping, current number of tasks: {}", getCurrentNumberOfTasks());
return Flux.empty();
}
@@ -215,7 +204,8 @@ public class ScheduledTasks {
private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
- logger.error("Polling for file ready message failed, exception: {}", exception);
+ logger.error("Polling for file ready message failed, exception: {}, config: {}", exception.toString(),
+ this.applicationConfiguration.getDmaapConsumerConfiguration());
return Flux.empty();
}
@@ -229,24 +219,4 @@ public class ScheduledTasks {
}
}
- PublishedChecker createPublishedChecker() {
- return new PublishedChecker(applicationConfiguration);
- }
-
- int getCurrentNumberOfTasks() {
- return currentNumberOfTasks.get();
- }
-
- DMaaPMessageConsumerTask createConsumerTask() {
- return new DMaaPMessageConsumerTask(this.applicationConfiguration);
- }
-
- FileCollector createFileCollector() {
- return new FileCollector(applicationConfiguration);
- }
-
- DataRouterPublisher createDataRouterPublisher() {
- return new DataRouterPublisher(applicationConfiguration);
- }
-
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java
index 1f5827c8..84c5e07b 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java
@@ -44,6 +44,20 @@ public class FileDataTest {
private static final String LOCATION_WITHOUT_USER =
FTPES_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
+
+ private MessageMetaData messageMetaData() {
+ return ImmutableMessageMetaData.builder()
+ .productName("PRODUCT_NAME")
+ .vendorName("VENDOR_NAME")
+ .lastEpochMicrosec("LAST_EPOCH_MICROSEC")
+ .sourceName("SOURCE_NAME")
+ .startEpochMicrosec("START_EPOCH_MICROSEC")
+ .timeZoneOffset("TIME_ZONE_OFFSET")
+ .changeIdentifier("PM_MEAS_CHANGE_IDENTIFIER")
+ .changeType("FILE_READY_CHANGE_TYPE")
+ .build();
+ }
+
private FileData properFileDataWithUser() {
// @formatter:off
return ImmutableFileData.builder()
@@ -53,6 +67,7 @@ public class FileDataTest {
.fileFormatType("type")
.fileFormatVersion("version")
.scheme(Scheme.FTPS)
+ .messageMetaData(messageMetaData())
.build();
// @formatter:on
}
@@ -66,6 +81,7 @@ public class FileDataTest {
.fileFormatType("type")
.fileFormatVersion("version")
.scheme(Scheme.FTPS)
+ .messageMetaData(messageMetaData())
.build();
// @formatter:on
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
index b33180fa..b8aa7da2 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
@@ -29,7 +29,6 @@ import java.util.Optional;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
@@ -67,7 +66,7 @@ class JsonMessageParserTest {
private static final String NOTIFICATION_FIELDS_VERSION = "1.0";
@Test
- void whenPassingCorrectJson_oneFileReadyMessage() throws DmaapNotFoundException {
+ void whenPassingCorrectJson_oneFileReadyMessage() {
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
.name(PM_FILE_NAME) //
.location(LOCATION) //
@@ -100,12 +99,11 @@ class JsonMessageParserTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
+ .messageMetaData(messageMetaData)
.build();
List<FileData> files = new ArrayList<>();
files.add(expectedFileData);
FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
- .pnfName(SOURCE_NAME) //
- .messageMetaData(messageMetaData) //
.files(files) //
.build();
@@ -121,7 +119,7 @@ class JsonMessageParserTest {
}
@Test
- void whenPassingCorrectJsonWithTwoEvents_twoMessages() throws DmaapNotFoundException {
+ void whenPassingCorrectJsonWithTwoEvents_twoMessages() {
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
.name(PM_FILE_NAME) //
.location(LOCATION) //
@@ -154,12 +152,11 @@ class JsonMessageParserTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
+ .messageMetaData(messageMetaData)
.build();
List<FileData> files = new ArrayList<>();
files.add(expectedFileData);
FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() //
- .pnfName(SOURCE_NAME) //
- .messageMetaData(messageMetaData) //
.files(files) //
.build();
@@ -202,7 +199,7 @@ class JsonMessageParserTest {
}
@Test
- void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() throws DmaapNotFoundException {
+ void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() {
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
.name(PM_FILE_NAME) //
.location(LOCATION) //
@@ -235,12 +232,11 @@ class JsonMessageParserTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
+ .messageMetaData(messageMetaData)
.build();
List<FileData> files = new ArrayList<>();
files.add(expectedFileData);
FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() //
- .pnfName(SOURCE_NAME) //
- .messageMetaData(messageMetaData) //
.files(files) //
.build();
@@ -421,12 +417,11 @@ class JsonMessageParserTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
+ .messageMetaData(messageMetaData)
.build();
List<FileData> files = new ArrayList<>();
files.add(expectedFileData);
FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() //
- .pnfName(SOURCE_NAME) //
- .messageMetaData(messageMetaData) //
.files(files) //
.build();
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
index a695e20d..98c7dc32 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
@@ -20,7 +20,6 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -156,13 +155,12 @@ public class DMaaPMessageConsumerTaskImplTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
+ .messageMetaData(messageMetaData)
.build();
List<FileData> files = new ArrayList<>();
files.add(ftpesFileData);
expectedFtpesMessage = ImmutableFileReadyMessage.builder() //
- .pnfName(SOURCE_NAME) //
- .messageMetaData(messageMetaData) //
.files(files) //
.build();
@@ -187,6 +185,7 @@ public class DMaaPMessageConsumerTaskImplTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
+ .messageMetaData(messageMetaData)
.build();
ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder() //
@@ -208,8 +207,6 @@ public class DMaaPMessageConsumerTaskImplTest {
files = new ArrayList<>();
files.add(sftpFileData);
expectedSftpMessage = ImmutableFileReadyMessage.builder() //
- .pnfName(SOURCE_NAME) //
- .messageMetaData(messageMetaData) //
.files(files) //
.build();
}
@@ -264,8 +261,6 @@ public class DMaaPMessageConsumerTaskImplTest {
.thenReturn(Flux.error(new DatafileTaskException("problemas")));
}
- messageConsumerTask = spy(new DMaaPMessageConsumerTask(appConfig, httpClientMock, jsonMessageParserMock));
- when(messageConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration);
- doReturn(httpClientMock).when(messageConsumerTask).resolveClient();
+ messageConsumerTask = spy(new DMaaPMessageConsumerTask(httpClientMock, jsonMessageParserMock));
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
index ed8b93f1..fe867738 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
@@ -86,13 +86,11 @@ class DataRouterPublisherTest {
private static final String FEED_ID = "1";
private static final String FILE_CONTENT = "Just a string.";
- private static final Map<String, String> CONTEXT_MAP = new HashMap<>();
-
private static ConsumerDmaapModel consumerDmaapModel;
private static DmaapProducerReactiveHttpClient httpClientMock;
private static AppConfig appConfig;
private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
-
+ private final Map<String, String> contextMap = new HashMap<>();
private static DataRouterPublisher publisherTaskUnderTestSpy;
/**
@@ -125,9 +123,8 @@ class DataRouterPublisherTest {
@Test
public void whenPassedObjectFits_ReturnsCorrectStatus() throws Exception {
prepareMocksForTests(null, Integer.valueOf(HttpStatus.OK.value()));
-
StepVerifier
- .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), CONTEXT_MAP))
+ .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
.expectNext(consumerDmaapModel) //
.verifyComplete();
@@ -170,7 +167,7 @@ class DataRouterPublisherTest {
prepareMocksForTests(new DatafileTaskException("Error"), HttpStatus.OK.value());
StepVerifier
- .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 2, Duration.ofSeconds(0), CONTEXT_MAP))
+ .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 2, Duration.ofSeconds(0), contextMap))
.expectNext(consumerDmaapModel) //
.verifyComplete();
}
@@ -181,7 +178,7 @@ class DataRouterPublisherTest {
Integer.valueOf(HttpStatus.OK.value()));
StepVerifier
- .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), CONTEXT_MAP))
+ .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
.expectNext(consumerDmaapModel) //
.verifyComplete();
@@ -197,7 +194,7 @@ class DataRouterPublisherTest {
Integer.valueOf((HttpStatus.BAD_GATEWAY.value())));
StepVerifier
- .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), CONTEXT_MAP))
+ .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
.expectErrorMessage("Retries exhausted: 1/1") //
.verify();
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
index fb49c860..6e17f27b 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
@@ -67,7 +67,7 @@ public class FileCollectorTest {
private static final int PORT_22 = 22;
private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME;
- private static final Path LOCAL_FILE_LOCATION = FileData.createLocalFileName(SERVER_ADDRESS, PM_FILE_NAME);
+ private static final Path LOCAL_FILE_LOCATION = FileData.createLocalFileName(SOURCE_NAME, PM_FILE_NAME);
private static final String USER = "usr";
private static final String PWD = "pwd";
private static final String FTPES_LOCATION =
@@ -93,6 +93,7 @@ public class FileCollectorTest {
private FtpsClient ftpsClientMock = mock(FtpsClient.class);
private SftpClient sftpClientMock = mock(SftpClient.class);
+ private final Map<String, String> contextMap = new HashMap<>();
private MessageMetaData createMessageMetaData() {
@@ -119,6 +120,7 @@ public class FileCollectorTest {
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
.scheme(scheme)
+ .messageMetaData(createMessageMetaData())
.build();
// @formatter:on
}
@@ -160,11 +162,11 @@ public class FileCollectorTest {
ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT);
- Map<String, String> contextMap = new HashMap<>();
StepVerifier.create(
- collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+ collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0), contextMap))
.expectNext(expectedConsumerDmaapModel).verifyComplete();
+ verify(ftpsClientMock, times(1)).open();
verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
verify(ftpsClientMock, times(1)).close();
@@ -176,12 +178,12 @@ public class FileCollectorTest {
FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock));
doReturn(sftpClientMock).when(collectorUndetTest).createSftpClient(any());
+
FileData fileData = createFileData(SFTP_LOCATION_NO_PORT, Scheme.SFTP);
ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(SFTP_LOCATION_NO_PORT);
- Map<String, String> contextMap = new HashMap<>();
StepVerifier
- .create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0),
+ .create(collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0),
contextMap))
.expectNext(expectedConsumerDmaapModel) //
.verifyComplete();
@@ -191,11 +193,12 @@ public class FileCollectorTest {
expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(SFTP_LOCATION);
StepVerifier
- .create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0),
+ .create(collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0),
contextMap))
.expectNext(expectedConsumerDmaapModel) //
.verifyComplete();
+ verify(sftpClientMock, times(2)).open();
verify(sftpClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
verify(sftpClientMock, times(2)).close();
verifyNoMoreInteractions(sftpClientMock);
@@ -210,9 +213,8 @@ public class FileCollectorTest {
doThrow(new DatafileTaskException("Unable to collect file.")).when(ftpsClientMock)
.collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- Map<String, String> contextMap = new HashMap<>();
StepVerifier.create(
- collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+ collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0), contextMap))
.expectErrorMessage("Retries exhausted: 3/3").verify();
verify(ftpsClientMock, times(4)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
@@ -228,9 +230,9 @@ public class FileCollectorTest {
ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT);
FileData fileData = createFileData(FTPES_LOCATION_NO_PORT, Scheme.FTPS);
- Map<String, String> contextMap = new HashMap<>();
+
StepVerifier.create(
- collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+ collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0), contextMap))
.expectNext(expectedConsumerDmaapModel).verifyComplete();
verify(ftpsClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
index d781cea3..f6beae02 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
@@ -30,8 +30,10 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import java.nio.file.Paths;
import java.time.Duration;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -64,6 +66,7 @@ public class ScheduledTasksTest {
private PublishedChecker publishedCheckerMock;
private FileCollector fileCollectorMock;
private DataRouterPublisher dataRouterMock;
+ private Map<String, String> contextMap = new HashMap<String, String>();
@BeforeEach
private void setUp() {
@@ -115,6 +118,7 @@ public class ScheduledTasksTest {
.location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
.scheme(Scheme.FTPS) //
.compression("") //
+ .messageMetaData(messageMetaData())
.build();
}
@@ -130,9 +134,7 @@ public class ScheduledTasksTest {
}
private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) {
- MessageMetaData md = messageMetaData();
- return ImmutableFileReadyMessage.builder().pnfName(md.sourceName()).messageMetaData(md)
- .files(files(numberOfFiles, uniqueNames)).build();
+ return ImmutableFileReadyMessage.builder().files(files(numberOfFiles, uniqueNames)).build();
}
private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) {
@@ -185,17 +187,17 @@ public class ScheduledTasksTest {
doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
- doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), anyLong(), notNull(), notNull());
doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
- StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
+ StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
.expectNextCount(noOfFiles) //
.expectComplete() //
.verify(); //
assertEquals(0, testedObject.getCurrentNumberOfTasks());
verify(consumerMock, times(1)).execute();
- verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull(), notNull());
verify(dataRouterMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull(), any());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
@@ -215,19 +217,19 @@ public class ScheduledTasksTest {
// First file collect will fail, 3 will succeed
doReturn(error, collectedFile, collectedFile, collectedFile) //
.when(fileCollectorMock) //
- .execute(any(FileData.class), any(MessageMetaData.class), anyLong(), any(Duration.class), any());
+ .execute(any(FileData.class), anyLong(), any(Duration.class), notNull());
doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
- StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
+ StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
.expectNextCount(3) //
.expectComplete() //
.verify(); //
assertEquals(0, testedObject.getCurrentNumberOfTasks());
verify(consumerMock, times(1)).execute();
- verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ verify(fileCollectorMock, times(4)).execute(notNull(), anyLong(), notNull(), notNull());
verify(dataRouterMock, times(3)).execute(notNull(), anyLong(), notNull(), any());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
@@ -243,7 +245,7 @@ public class ScheduledTasksTest {
doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
- doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), anyLong(), notNull(), notNull());
Mono<Object> error = Mono.error(new Exception("problem"));
// One publish will fail, the rest will succeed
@@ -251,14 +253,14 @@ public class ScheduledTasksTest {
.when(dataRouterMock) //
.execute(notNull(), anyLong(), notNull(), any());
- StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
+ StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
.expectNextCount(3) // 3 completed files
.expectComplete() //
.verify(); //
assertEquals(0, testedObject.getCurrentNumberOfTasks());
verify(consumerMock, times(1)).execute();
- verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ verify(fileCollectorMock, times(4)).execute(notNull(), anyLong(), notNull(), notNull());
verify(dataRouterMock, times(4)).execute(notNull(), anyLong(), notNull(), any());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
@@ -277,17 +279,17 @@ public class ScheduledTasksTest {
doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
- doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), anyLong(), notNull(), notNull());
doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
- StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
+ StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
.expectNextCount(1) // 99 is skipped
.expectComplete() //
.verify(); //
assertEquals(0, testedObject.getCurrentNumberOfTasks());
verify(consumerMock, times(1)).execute();
- verify(fileCollectorMock, times(1)).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ verify(fileCollectorMock, times(1)).execute(notNull(), anyLong(), notNull(), notNull());
verify(dataRouterMock, times(1)).execute(notNull(), anyLong(), notNull(), any());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);