summaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src
diff options
context:
space:
mode:
authorPatrikBuhr <patrik.buhr@est.tech>2019-03-22 14:29:29 +0000
committerPatrikBuhr <patrik.buhr@est.tech>2019-03-22 14:29:29 +0000
commite1a66425d3ba1df5ae2a8f2b99168707e08b655a (patch)
treebdfcbdbe4a3dd7286a562f974dec61691e216848 /datafile-app-server/src
parent4bd281390ed24b278846775c1157f82db81fddbe (diff)
Local filename updated, stability issues
The local filename is changed so it contains PNF name instead of the PNF IP address. The paralellism is restricted to 100 worker threads in order to solve problems with too many open file descriptors and out of memory. Logging is improved. Change-Id: I24ce2e23020cc253a3c7bebac1ab5cf703b5b144 Issue-ID: DCAEGEN2-1118 Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Diffstat (limited to 'datafile-app-server/src')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java6
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java5
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java31
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java27
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java36
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java28
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java7
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java33
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java11
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java156
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java16
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java19
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java11
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java13
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java22
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java32
17 files changed, 204 insertions, 253 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index f89a1012..a30d2826 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -112,10 +112,8 @@ public class AppConfig {
setConfiguration(consumerConfiguration, publisherConfiguration, ftpesConfig);
}
- } catch (IOException e) {
- logger.error("Problem with file loading, file: {}", filepath, e);
- } catch (JsonSyntaxException e) {
- logger.error("Problem with Json deserialization", e);
+ } catch (JsonSyntaxException | IOException e) {
+ logger.error("Problem with loading configuration, file: {}", filepath, e);
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
index 52723330..d5c8b3b2 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
@@ -63,7 +63,7 @@ public class SchedulerConfig {
private static final Logger logger = LoggerFactory.getLogger(SchedulerConfig.class);
private static final Marker ENTRY = MarkerFactory.getMarker("ENTRY");
private static final Marker EXIT = MarkerFactory.getMarker("EXIT");
- private static volatile List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
+ private static List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
private Map<String, String> contextMap;
private final TaskScheduler taskScheduler;
@@ -97,8 +97,7 @@ public class SchedulerConfig {
MdcVariables.setMdcContextMap(contextMap);
logger.info(EXIT, "Stopped Datafile workflow");
MDC.clear();
- return Mono.defer(() -> Mono
- .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED)));
+ return Mono.just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED));
}
/**
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java
deleted file mode 100644
index 36279016..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.exceptions;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
- */
-public class DmaapNotFoundException extends DatafileTaskException {
-
- private static final long serialVersionUID = 1L;
-
- public DmaapNotFoundException(String message) {
- super(message);
- }
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
index bdb47b2b..d0d1f91a 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
@@ -37,10 +37,19 @@ import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
public abstract class FileData {
private static final String DATAFILE_TMPDIR = "/tmp/onap_datafile/";
+ /**
+ * @return the file name with no path
+ */
public abstract String name();
+ /**
+ * @return the URL to use to fetch the file from the PNF
+ */
public abstract String location();
+ /**
+ * @return the file transfer protocol to use for fetching the file
+ */
public abstract Scheme scheme();
public abstract String compression();
@@ -49,17 +58,25 @@ public abstract class FileData {
public abstract String fileFormatVersion();
+ public abstract MessageMetaData messageMetaData();
+
+ /**
+ * @return the name of the PNF, must be unique in the network
+ */
+ public String sourceName() {
+ return messageMetaData().sourceName();
+ }
+
public String remoteFilePath() {
return URI.create(location()).getPath();
}
public Path getLocalFileName() {
- URI uri = URI.create(location());
- return createLocalFileName(uri.getHost(), name());
+ return createLocalFileName(messageMetaData().sourceName(), name());
}
- public static Path createLocalFileName(String host, String fileName) {
- return Paths.get(DATAFILE_TMPDIR, host + "_" + fileName);
+ public static Path createLocalFileName(String sourceName, String fileName) {
+ return Paths.get(DATAFILE_TMPDIR, sourceName + "_" + fileName);
}
public FileServerData fileServerData() {
@@ -86,4 +103,4 @@ public abstract class FileData {
}
return Optional.empty();
}
-} \ No newline at end of file
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
index e3293faa..9373a4f2 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
@@ -31,9 +31,5 @@ import org.immutables.value.Value;
@Value.Immutable
@Gson.TypeAdapters
public interface FileReadyMessage {
- public String pnfName();
-
- public MessageMetaData messageMetaData();
-
public List<FileData> files();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index af45cc99..a3595ecf 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -100,9 +100,13 @@ public class JsonMessageParser {
Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
JsonParser jsonParser = new JsonParser();
- return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
- : element.isJsonObject() ? Optional.of((JsonObject) element)
- : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
+ if (element.isJsonPrimitive()) {
+ return Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject());
+ } else if (element.isJsonObject()) {
+ return Optional.of((JsonObject) element);
+ } else {
+ return Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
+ }
}
private Flux<FileReadyMessage> getMessagesFromJsonArray(JsonElement jsonElement) {
@@ -133,18 +137,17 @@ public class JsonMessageParser {
: logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject));
}
+
private Mono<FileReadyMessage> transformMessages(JsonObject message) {
Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message);
if (optionalMessageMetaData.isPresent()) {
+ MessageMetaData messageMetaData = optionalMessageMetaData.get();
JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
if (arrayOfNamedHashMap != null) {
- List<FileData> allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap);
+ List<FileData> allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap, messageMetaData);
if (!allFileDataFromJson.isEmpty()) {
- MessageMetaData messageMetaData = optionalMessageMetaData.get();
return Mono.just(ImmutableFileReadyMessage.builder() //
- .pnfName(messageMetaData.sourceName()) //
- .messageMetaData(messageMetaData) //
.files(allFileDataFromJson) //
.build());
} else {
@@ -152,13 +155,14 @@ public class JsonMessageParser {
}
}
- logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message);
+ logger.error("VES event parsing. Missing arrayOfNamedHashMap in message. {}", message);
return Mono.empty();
}
- logger.error("Unable to collect file from xNF. FileReady event has incorrect JsonObject. {}", message);
+ logger.error("VES event parsing. FileReady event has incorrect JsonObject. {}", message);
return Mono.empty();
}
+
private Optional<MessageMetaData> getMessageMetaData(JsonObject message) {
List<String> missingValues = new ArrayList<>();
JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER);
@@ -185,7 +189,7 @@ public class JsonMessageParser {
if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) {
return Optional.of(messageMetaData);
} else {
- String errorMessage = "Unable to collect file from xNF.";
+ String errorMessage = "VES event parsing.";
if (!missingValues.isEmpty()) {
errorMessage += " Missing data: " + missingValues;
}
@@ -206,11 +210,11 @@ public class JsonMessageParser {
return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier);
}
- private List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields) {
+ private List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields, MessageMetaData messageMetaData) {
List<FileData> res = new ArrayList<>();
for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
- Optional<FileData> fileData = getFileDataFromJson(fileInfo);
+ Optional<FileData> fileData = getFileDataFromJson(fileInfo, messageMetaData);
if (fileData.isPresent()) {
res.add(fileData.get());
@@ -219,7 +223,7 @@ public class JsonMessageParser {
return res;
}
- private Optional<FileData> getFileDataFromJson(JsonObject fileInfo) {
+ private Optional<FileData> getFileDataFromJson(JsonObject fileInfo, MessageMetaData messageMetaData) {
logger.trace("starting to getFileDataFromJson!");
List<String> missingValues = new ArrayList<>();
@@ -230,7 +234,7 @@ public class JsonMessageParser {
try {
scheme = Scheme.getSchemeFromString(URI.create(location).getScheme());
} catch (Exception e) {
- logger.error("Unable to collect file from xNF.", e);
+ logger.error("VES event parsing.", e);
return Optional.empty();
}
FileData fileData = ImmutableFileData.builder() //
@@ -240,12 +244,12 @@ public class JsonMessageParser {
.location(location) //
.scheme(scheme) //
.compression(getValueFromJson(data, COMPRESSION, missingValues)) //
+ .messageMetaData(messageMetaData)
.build();
if (missingValues.isEmpty()) {
return Optional.of(fileData);
}
- logger.error("Unable to collect file from xNF. File information wrong. Missing data: {} Data: {}",
- missingValues, fileInfo);
+ logger.error("VES event parsing. File information wrong. Missing data: {} Data: {}", missingValues, fileInfo);
return Optional.empty();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java
index f6daf733..49e2f01e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java
@@ -30,8 +30,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consume
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.reactive.function.client.WebClient;
-import reactor.core.publisher.Mono;
+
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
@@ -39,25 +40,21 @@ import reactor.core.publisher.Flux;
public class DMaaPMessageConsumerTask {
private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumerTask.class);
- private AppConfig datafileAppConfig;
- private JsonMessageParser jsonMessageParser;
- private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
+ private final JsonMessageParser jsonMessageParser;
+ private final DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
public DMaaPMessageConsumerTask(AppConfig datafileAppConfig) {
- this.datafileAppConfig = datafileAppConfig;
this.jsonMessageParser = new JsonMessageParser();
+ this.dmaaPConsumerReactiveHttpClient = createHttpClient(datafileAppConfig);
}
- protected DMaaPMessageConsumerTask(AppConfig datafileAppConfig,
- DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
+ protected DMaaPMessageConsumerTask(DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
JsonMessageParser messageParser) {
- this.datafileAppConfig = datafileAppConfig;
this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient;
this.jsonMessageParser = messageParser;
}
public Flux<FileReadyMessage> execute() {
- dmaaPConsumerReactiveHttpClient = resolveClient();
logger.trace("execute called");
return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()));
}
@@ -67,15 +64,10 @@ public class DMaaPMessageConsumerTask {
return jsonMessageParser.getMessagesFromJson(message);
}
- protected DmaapConsumerConfiguration resolveConfiguration() {
- return datafileAppConfig.getDmaapConsumerConfiguration();
+ private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig) {
+ DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration();
+ WebClient client = new DmaapReactiveWebClient().fromConfiguration(config).build();
+ return new DMaaPConsumerReactiveHttpClient(config, client);
}
- protected DMaaPConsumerReactiveHttpClient resolveClient() {
- return new DMaaPConsumerReactiveHttpClient(resolveConfiguration(), buildWebClient());
- }
-
- protected WebClient buildWebClient() {
- return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
- }
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index 0fef9ab4..4207d1fc 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -84,12 +84,13 @@ public class DataRouterPublisher {
* @param model information about the file to publish
* @param numRetries the maximal number of retries if the publishing fails
* @param firstBackoff the time to delay the first retry
- * @return the HTTP response status as a string
+ * @param contextMap tracing context variables
+ * @return the (same) ConsumerDmaapModel
*/
public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff,
Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
- logger.trace("Method called with arg {}", model);
+ logger.trace("Publish called with arg {}", model);
dmaapProducerReactiveHttpClient = resolveClient();
return Mono.just(model)
@@ -117,7 +118,7 @@ public class DataRouterPublisher {
logger.trace(response.toString());
return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
} catch (Exception e) {
- logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e);
+ logger.warn("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e);
return Mono.error(e);
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index fb27a579..8849b45e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -49,31 +49,31 @@ public class FileCollector {
this.datafileAppConfig = datafileAppConfig;
}
- public Mono<ConsumerDmaapModel> execute(FileData fileData, MessageMetaData metaData, long maxNumberOfRetries,
- Duration firstBackoffTimeout, Map<String, String> contextMap) {
+ public Mono<ConsumerDmaapModel> execute(FileData fileData, long maxNumberOfRetries, Duration firstBackoffTimeout,
+ Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
logger.trace("Entering execute with {}", fileData);
return Mono.just(fileData) //
.cache() //
- .flatMap(fd -> collectFile(fileData, metaData, contextMap)) //
+ .flatMap(fd -> collectFile(fileData, contextMap)) //
.retryBackoff(maxNumberOfRetries, firstBackoffTimeout);
}
- private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData,
- Map<String, String> contextMap) {
+ private Mono<ConsumerDmaapModel> collectFile(FileData fileData, Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
- logger.trace("starting to collectFile");
+ logger.trace("starting to collectFile {}", fileData.name());
final String remoteFile = fileData.remoteFilePath();
final Path localFile = fileData.getLocalFileName();
try (FileCollectClient currentClient = createClient(fileData)) {
+ currentClient.open();
localFile.getParent().toFile().mkdir(); // Create parent directories
currentClient.collectFile(remoteFile, localFile);
- return Mono.just(getConsumerDmaapModel(fileData, metaData, localFile));
+ return Mono.just(getConsumerDmaapModel(fileData, localFile));
} catch (Exception throwable) {
- logger.warn("Failed to download file: {}, reason: {}", fileData.name(), throwable);
+ logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(), throwable.toString());
return Mono.error(throwable);
}
}
@@ -89,9 +89,9 @@ public class FileCollector {
}
}
- private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, MessageMetaData metaData, Path localFile) {
+ private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, Path localFile) {
String location = fileData.location();
-
+ MessageMetaData metaData = fileData.messageMetaData();
return ImmutableConsumerDmaapModel.builder() //
.productName(metaData.productName()) //
.vendorName(metaData.vendorName()) //
@@ -108,16 +108,13 @@ public class FileCollector {
.build();
}
- SftpClient createSftpClient(FileData fileData) throws DatafileTaskException {
- SftpClient client = new SftpClient(fileData.fileServerData());
- client.open();
- return client;
+ protected SftpClient createSftpClient(FileData fileData) {
+ return new SftpClient(fileData.fileServerData());
}
- FtpsClient createFtpsClient(FileData fileData) throws DatafileTaskException {
+ protected FtpsClient createFtpsClient(FileData fileData) {
FtpesConfig config = datafileAppConfig.getFtpesConfiguration();
- FtpsClient client = new FtpsClient(fileData.fileServerData());
- client.open(config.keyCert(), config.keyPassword(), Paths.get(config.trustedCA()), config.trustedCAPassword());
- return client;
+ return new FtpsClient(fileData.fileServerData(), config.keyCert(), config.keyPassword(),
+ Paths.get(config.trustedCA()), config.trustedCAPassword());
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
index 0729caa0..41f8e3cd 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
@@ -54,7 +54,7 @@ public class PublishedChecker {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
- private AppConfig appConfig;
+ private final AppConfig appConfig;
/**
* Constructor.
@@ -88,12 +88,13 @@ public class PublishedChecker {
HttpResponse response =
producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest, 2000, contextMap);
- logger.trace(response.toString());
+ logger.trace("{}", response);
int status = response.getStatusLine().getStatusCode();
HttpEntity entity = response.getEntity();
- InputStream content = entity.getContent();
- String body = IOUtils.toString(content);
- return HttpStatus.SC_OK == status && !"[]".equals(body);
+ try (InputStream content = entity.getContent()) {
+ String body = IOUtils.toString(content);
+ return HttpStatus.SC_OK == status && !"[]".equals(body);
+ }
} catch (Exception e) {
logger.warn("Unable to check if file has been published.", e);
return false;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index d41e5c25..b4096c73 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -20,8 +20,6 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@@ -29,7 +27,6 @@ import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
-import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.collectors.datafile.service.PublishedFileCache;
import org.slf4j.Logger;
@@ -44,32 +41,24 @@ import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
/**
- * This implements the main flow of the data file collector. Fetch file ready events from the message router, fetch new
- * files from the PNF publish these in the data router.
+ * This implements the main flow of the data file collector. Fetch file ready events from the
+ * message router, fetch new files from the PNF publish these in the data router.
*/
@Component
public class ScheduledTasks {
- private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200;
- private static final int MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS = 10;
-
- /** Data needed for fetching of one file. */
- private class FileCollectionData {
- final FileData fileData;
- final MessageMetaData metaData;
-
- FileCollectionData(FileData fd, MessageMetaData metaData) {
- this.fileData = fd;
- this.metaData = metaData;
- }
- }
+ private static final int NUMBER_OF_WORKER_THREADS = 100;
+ private static final int MAX_TASKS_FOR_POLLING = 50;
+ private static final long DATA_ROUTER_MAX_RETRIES = 5;
+ private static final Duration DATA_ROUTER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(2);
+ private static final long FILE_TRANSFER_MAX_RETRIES = 3;
+ private static final Duration FILE_TRANSFER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(5);
private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
private final AppConfig applicationConfiguration;
private final AtomicInteger currentNumberOfTasks = new AtomicInteger();
- private final Scheduler scheduler =
- Schedulers.newElastic("DataFileCollector", MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS);
+ private final Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS);
PublishedFileCache alreadyPublishedFiles = new PublishedFileCache();
/**
@@ -86,21 +75,25 @@ public class ScheduledTasks {
* Main function for scheduling for the file collection Workflow.
*/
public void scheduleMainDatafileEventTask(Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
- logger.trace("Execution of tasks was registered");
- applicationConfiguration.loadConfigurationFromFile();
- createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap),
- () -> onComplete(contextMap));
+ try {
+ MdcVariables.setMdcContextMap(contextMap);
+ logger.trace("Execution of tasks was registered");
+ applicationConfiguration.loadConfigurationFromFile();
+ createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap),
+ () -> onComplete(contextMap));
+ } catch (Exception e) {
+ logger.error("Unexpected exception: ", e);
+ }
}
Flux<ConsumerDmaapModel> createMainTask(Map<String, String> contextMap) {
return fetchMoreFileReadyMessages() //
- .parallel(getParallelism()) // Each FileReadyMessage in a separate thread
+ .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread
.runOn(scheduler) //
- .flatMap(this::createFileCollectionTask) //
+ .flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) //
.filter(fileData -> shouldBePublished(fileData, contextMap)) //
.doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
- .flatMap(fileData -> collectFileFromXnf(fileData, contextMap)) //
+ .flatMap(fileData -> fetchFile(fileData, contextMap)) //
.flatMap(model -> publishToDataRouter(model, contextMap)) //
.doOnNext(model -> deleteFile(model.getInternalLocation(), contextMap)) //
.doOnNext(model -> currentNumberOfTasks.decrementAndGet()) //
@@ -114,63 +107,61 @@ public class ScheduledTasks {
alreadyPublishedFiles.purge(now);
}
- private void onComplete(Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
- logger.info("Datafile tasks have been completed");
+ protected PublishedChecker createPublishedChecker() {
+ return new PublishedChecker(applicationConfiguration);
}
- private void onSuccess(ConsumerDmaapModel model, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
- logger.info("Datafile consumed tasks {}", model.getInternalLocation());
+ protected int getCurrentNumberOfTasks() {
+ return currentNumberOfTasks.get();
}
- private void onError(Throwable throwable, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
- logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable);
+ protected DMaaPMessageConsumerTask createConsumerTask() {
+ return new DMaaPMessageConsumerTask(this.applicationConfiguration);
}
- private int getParallelism() {
- if (MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks() > 0) {
- return MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks();
- } else {
- return 1; // We need at least one rail/thread
- }
+ protected FileCollector createFileCollector() {
+ return new FileCollector(applicationConfiguration);
+ }
+
+ protected DataRouterPublisher createDataRouterPublisher() {
+ return new DataRouterPublisher(applicationConfiguration);
}
- private Flux<FileCollectionData> createFileCollectionTask(FileReadyMessage availableFiles) {
- List<FileCollectionData> fileCollects = new ArrayList<>();
+ private void onComplete(Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
+ logger.trace("Datafile tasks have been completed");
+ }
- for (FileData fileData : availableFiles.files()) {
- fileCollects.add(new FileCollectionData(fileData, availableFiles.messageMetaData()));
- }
- return Flux.fromIterable(fileCollects);
+ private synchronized void onSuccess(ConsumerDmaapModel model, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
+ logger.info("Datafile file published {}", model.getInternalLocation());
}
- private boolean shouldBePublished(FileCollectionData task, Map<String, String> contextMap) {
+ private void onError(Throwable throwable, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
+ logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable.toString());
+ }
+
+ private boolean shouldBePublished(FileData fileData, Map<String, String> contextMap) {
boolean result = false;
- Path localFileName = task.fileData.getLocalFileName();
+ Path localFileName = fileData.getLocalFileName();
if (alreadyPublishedFiles.put(localFileName) == null) {
result = !createPublishedChecker().execute(localFileName.getFileName().toString(), contextMap);
}
return result;
}
- private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect,
- Map<String, String> contextMap) {
- final long maxNUmberOfRetries = 3;
- final Duration initialRetryTimeout = Duration.ofSeconds(5);
-
+ private Mono<ConsumerDmaapModel> fetchFile(FileData fileData, Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
return createFileCollector()
- .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout,
- contextMap)
- .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, contextMap));
+ .execute(fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, contextMap)
+ .onErrorResume(exception -> handleFetchFileFailure(fileData, contextMap));
}
- private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData, Map<String, String> contextMap) {
+ private Mono<ConsumerDmaapModel> handleFetchFileFailure(FileData fileData, Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
Path localFileName = fileData.getLocalFileName();
- logger.error("File fetching failed: {}", localFileName);
+ logger.error("File fetching failed, fileData {}", fileData);
deleteFile(localFileName, contextMap);
alreadyPublishedFiles.remove(localFileName);
currentNumberOfTasks.decrementAndGet();
@@ -178,19 +169,16 @@ public class ScheduledTasks {
}
private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model, Map<String, String> contextMap) {
- final long maxNumberOfRetries = 3;
- final Duration initialRetryTimeout = Duration.ofSeconds(5);
-
-
MdcVariables.setMdcContextMap(contextMap);
- return createDataRouterPublisher().execute(model, maxNumberOfRetries, initialRetryTimeout, contextMap)
- .onErrorResume(exception -> handlePublishFailure(model, exception, contextMap));
+
+ return createDataRouterPublisher()
+ .execute(model, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT, contextMap)
+ .onErrorResume(exception -> handlePublishFailure(model, contextMap));
}
- private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception,
- Map<String, String> contextMap) {
+ private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
- logger.error("File publishing failed: {}, exception: {}", model.getName(), exception);
+ logger.error("File publishing failed: {}", model);
Path internalFileName = model.getInternalLocation();
deleteFile(internalFileName, contextMap);
alreadyPublishedFiles.remove(internalFileName);
@@ -202,8 +190,9 @@ public class ScheduledTasks {
* Fetch more messages from the message router. This is done in a polling/blocking fashion.
*/
private Flux<FileReadyMessage> fetchMoreFileReadyMessages() {
- logger.trace("Consuming new file ready messages, current number of tasks: {}", getCurrentNumberOfTasks());
- if (getCurrentNumberOfTasks() > MAX_NUMBER_OF_CONCURRENT_TASKS) {
+ logger.info("Consuming new file ready messages, current number of tasks: {}", getCurrentNumberOfTasks());
+ if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING) {
+ logger.info("Skipping, current number of tasks: {}", getCurrentNumberOfTasks());
return Flux.empty();
}
@@ -215,7 +204,8 @@ public class ScheduledTasks {
private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
- logger.error("Polling for file ready message failed, exception: {}", exception);
+ logger.error("Polling for file ready message failed, exception: {}, config: {}", exception.toString(),
+ this.applicationConfiguration.getDmaapConsumerConfiguration());
return Flux.empty();
}
@@ -229,24 +219,4 @@ public class ScheduledTasks {
}
}
- PublishedChecker createPublishedChecker() {
- return new PublishedChecker(applicationConfiguration);
- }
-
- int getCurrentNumberOfTasks() {
- return currentNumberOfTasks.get();
- }
-
- DMaaPMessageConsumerTask createConsumerTask() {
- return new DMaaPMessageConsumerTask(this.applicationConfiguration);
- }
-
- FileCollector createFileCollector() {
- return new FileCollector(applicationConfiguration);
- }
-
- DataRouterPublisher createDataRouterPublisher() {
- return new DataRouterPublisher(applicationConfiguration);
- }
-
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java
index 1f5827c8..84c5e07b 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java
@@ -44,6 +44,20 @@ public class FileDataTest {
private static final String LOCATION_WITHOUT_USER =
FTPES_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
+
+ private MessageMetaData messageMetaData() {
+ return ImmutableMessageMetaData.builder()
+ .productName("PRODUCT_NAME")
+ .vendorName("VENDOR_NAME")
+ .lastEpochMicrosec("LAST_EPOCH_MICROSEC")
+ .sourceName("SOURCE_NAME")
+ .startEpochMicrosec("START_EPOCH_MICROSEC")
+ .timeZoneOffset("TIME_ZONE_OFFSET")
+ .changeIdentifier("PM_MEAS_CHANGE_IDENTIFIER")
+ .changeType("FILE_READY_CHANGE_TYPE")
+ .build();
+ }
+
private FileData properFileDataWithUser() {
// @formatter:off
return ImmutableFileData.builder()
@@ -53,6 +67,7 @@ public class FileDataTest {
.fileFormatType("type")
.fileFormatVersion("version")
.scheme(Scheme.FTPS)
+ .messageMetaData(messageMetaData())
.build();
// @formatter:on
}
@@ -66,6 +81,7 @@ public class FileDataTest {
.fileFormatType("type")
.fileFormatVersion("version")
.scheme(Scheme.FTPS)
+ .messageMetaData(messageMetaData())
.build();
// @formatter:on
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
index b33180fa..b8aa7da2 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
@@ -29,7 +29,6 @@ import java.util.Optional;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
@@ -67,7 +66,7 @@ class JsonMessageParserTest {
private static final String NOTIFICATION_FIELDS_VERSION = "1.0";
@Test
- void whenPassingCorrectJson_oneFileReadyMessage() throws DmaapNotFoundException {
+ void whenPassingCorrectJson_oneFileReadyMessage() {
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
.name(PM_FILE_NAME) //
.location(LOCATION) //
@@ -100,12 +99,11 @@ class JsonMessageParserTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
+ .messageMetaData(messageMetaData)
.build();
List<FileData> files = new ArrayList<>();
files.add(expectedFileData);
FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
- .pnfName(SOURCE_NAME) //
- .messageMetaData(messageMetaData) //
.files(files) //
.build();
@@ -121,7 +119,7 @@ class JsonMessageParserTest {
}
@Test
- void whenPassingCorrectJsonWithTwoEvents_twoMessages() throws DmaapNotFoundException {
+ void whenPassingCorrectJsonWithTwoEvents_twoMessages() {
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
.name(PM_FILE_NAME) //
.location(LOCATION) //
@@ -154,12 +152,11 @@ class JsonMessageParserTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
+ .messageMetaData(messageMetaData)
.build();
List<FileData> files = new ArrayList<>();
files.add(expectedFileData);
FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() //
- .pnfName(SOURCE_NAME) //
- .messageMetaData(messageMetaData) //
.files(files) //
.build();
@@ -202,7 +199,7 @@ class JsonMessageParserTest {
}
@Test
- void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() throws DmaapNotFoundException {
+ void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() {
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
.name(PM_FILE_NAME) //
.location(LOCATION) //
@@ -235,12 +232,11 @@ class JsonMessageParserTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
+ .messageMetaData(messageMetaData)
.build();
List<FileData> files = new ArrayList<>();
files.add(expectedFileData);
FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() //
- .pnfName(SOURCE_NAME) //
- .messageMetaData(messageMetaData) //
.files(files) //
.build();
@@ -421,12 +417,11 @@ class JsonMessageParserTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
+ .messageMetaData(messageMetaData)
.build();
List<FileData> files = new ArrayList<>();
files.add(expectedFileData);
FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() //
- .pnfName(SOURCE_NAME) //
- .messageMetaData(messageMetaData) //
.files(files) //
.build();
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
index a695e20d..98c7dc32 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
@@ -20,7 +20,6 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -156,13 +155,12 @@ public class DMaaPMessageConsumerTaskImplTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
+ .messageMetaData(messageMetaData)
.build();
List<FileData> files = new ArrayList<>();
files.add(ftpesFileData);
expectedFtpesMessage = ImmutableFileReadyMessage.builder() //
- .pnfName(SOURCE_NAME) //
- .messageMetaData(messageMetaData) //
.files(files) //
.build();
@@ -187,6 +185,7 @@ public class DMaaPMessageConsumerTaskImplTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
+ .messageMetaData(messageMetaData)
.build();
ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder() //
@@ -208,8 +207,6 @@ public class DMaaPMessageConsumerTaskImplTest {
files = new ArrayList<>();
files.add(sftpFileData);
expectedSftpMessage = ImmutableFileReadyMessage.builder() //
- .pnfName(SOURCE_NAME) //
- .messageMetaData(messageMetaData) //
.files(files) //
.build();
}
@@ -264,8 +261,6 @@ public class DMaaPMessageConsumerTaskImplTest {
.thenReturn(Flux.error(new DatafileTaskException("problemas")));
}
- messageConsumerTask = spy(new DMaaPMessageConsumerTask(appConfig, httpClientMock, jsonMessageParserMock));
- when(messageConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration);
- doReturn(httpClientMock).when(messageConsumerTask).resolveClient();
+ messageConsumerTask = spy(new DMaaPMessageConsumerTask(httpClientMock, jsonMessageParserMock));
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
index ed8b93f1..fe867738 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
@@ -86,13 +86,11 @@ class DataRouterPublisherTest {
private static final String FEED_ID = "1";
private static final String FILE_CONTENT = "Just a string.";
- private static final Map<String, String> CONTEXT_MAP = new HashMap<>();
-
private static ConsumerDmaapModel consumerDmaapModel;
private static DmaapProducerReactiveHttpClient httpClientMock;
private static AppConfig appConfig;
private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
-
+ private final Map<String, String> contextMap = new HashMap<>();
private static DataRouterPublisher publisherTaskUnderTestSpy;
/**
@@ -125,9 +123,8 @@ class DataRouterPublisherTest {
@Test
public void whenPassedObjectFits_ReturnsCorrectStatus() throws Exception {
prepareMocksForTests(null, Integer.valueOf(HttpStatus.OK.value()));
-
StepVerifier
- .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), CONTEXT_MAP))
+ .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
.expectNext(consumerDmaapModel) //
.verifyComplete();
@@ -170,7 +167,7 @@ class DataRouterPublisherTest {
prepareMocksForTests(new DatafileTaskException("Error"), HttpStatus.OK.value());
StepVerifier
- .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 2, Duration.ofSeconds(0), CONTEXT_MAP))
+ .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 2, Duration.ofSeconds(0), contextMap))
.expectNext(consumerDmaapModel) //
.verifyComplete();
}
@@ -181,7 +178,7 @@ class DataRouterPublisherTest {
Integer.valueOf(HttpStatus.OK.value()));
StepVerifier
- .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), CONTEXT_MAP))
+ .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
.expectNext(consumerDmaapModel) //
.verifyComplete();
@@ -197,7 +194,7 @@ class DataRouterPublisherTest {
Integer.valueOf((HttpStatus.BAD_GATEWAY.value())));
StepVerifier
- .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), CONTEXT_MAP))
+ .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
.expectErrorMessage("Retries exhausted: 1/1") //
.verify();
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
index fb49c860..6e17f27b 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
@@ -67,7 +67,7 @@ public class FileCollectorTest {
private static final int PORT_22 = 22;
private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME;
- private static final Path LOCAL_FILE_LOCATION = FileData.createLocalFileName(SERVER_ADDRESS, PM_FILE_NAME);
+ private static final Path LOCAL_FILE_LOCATION = FileData.createLocalFileName(SOURCE_NAME, PM_FILE_NAME);
private static final String USER = "usr";
private static final String PWD = "pwd";
private static final String FTPES_LOCATION =
@@ -93,6 +93,7 @@ public class FileCollectorTest {
private FtpsClient ftpsClientMock = mock(FtpsClient.class);
private SftpClient sftpClientMock = mock(SftpClient.class);
+ private final Map<String, String> contextMap = new HashMap<>();
private MessageMetaData createMessageMetaData() {
@@ -119,6 +120,7 @@ public class FileCollectorTest {
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
.scheme(scheme)
+ .messageMetaData(createMessageMetaData())
.build();
// @formatter:on
}
@@ -160,11 +162,11 @@ public class FileCollectorTest {
ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT);
- Map<String, String> contextMap = new HashMap<>();
StepVerifier.create(
- collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+ collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0), contextMap))
.expectNext(expectedConsumerDmaapModel).verifyComplete();
+ verify(ftpsClientMock, times(1)).open();
verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
verify(ftpsClientMock, times(1)).close();
@@ -176,12 +178,12 @@ public class FileCollectorTest {
FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock));
doReturn(sftpClientMock).when(collectorUndetTest).createSftpClient(any());
+
FileData fileData = createFileData(SFTP_LOCATION_NO_PORT, Scheme.SFTP);
ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(SFTP_LOCATION_NO_PORT);
- Map<String, String> contextMap = new HashMap<>();
StepVerifier
- .create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0),
+ .create(collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0),
contextMap))
.expectNext(expectedConsumerDmaapModel) //
.verifyComplete();
@@ -191,11 +193,12 @@ public class FileCollectorTest {
expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(SFTP_LOCATION);
StepVerifier
- .create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0),
+ .create(collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0),
contextMap))
.expectNext(expectedConsumerDmaapModel) //
.verifyComplete();
+ verify(sftpClientMock, times(2)).open();
verify(sftpClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
verify(sftpClientMock, times(2)).close();
verifyNoMoreInteractions(sftpClientMock);
@@ -210,9 +213,8 @@ public class FileCollectorTest {
doThrow(new DatafileTaskException("Unable to collect file.")).when(ftpsClientMock)
.collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- Map<String, String> contextMap = new HashMap<>();
StepVerifier.create(
- collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+ collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0), contextMap))
.expectErrorMessage("Retries exhausted: 3/3").verify();
verify(ftpsClientMock, times(4)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
@@ -228,9 +230,9 @@ public class FileCollectorTest {
ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT);
FileData fileData = createFileData(FTPES_LOCATION_NO_PORT, Scheme.FTPS);
- Map<String, String> contextMap = new HashMap<>();
+
StepVerifier.create(
- collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+ collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0), contextMap))
.expectNext(expectedConsumerDmaapModel).verifyComplete();
verify(ftpsClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
index d781cea3..f6beae02 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
@@ -30,8 +30,10 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import java.nio.file.Paths;
import java.time.Duration;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -64,6 +66,7 @@ public class ScheduledTasksTest {
private PublishedChecker publishedCheckerMock;
private FileCollector fileCollectorMock;
private DataRouterPublisher dataRouterMock;
+ private Map<String, String> contextMap = new HashMap<String, String>();
@BeforeEach
private void setUp() {
@@ -115,6 +118,7 @@ public class ScheduledTasksTest {
.location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
.scheme(Scheme.FTPS) //
.compression("") //
+ .messageMetaData(messageMetaData())
.build();
}
@@ -130,9 +134,7 @@ public class ScheduledTasksTest {
}
private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) {
- MessageMetaData md = messageMetaData();
- return ImmutableFileReadyMessage.builder().pnfName(md.sourceName()).messageMetaData(md)
- .files(files(numberOfFiles, uniqueNames)).build();
+ return ImmutableFileReadyMessage.builder().files(files(numberOfFiles, uniqueNames)).build();
}
private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) {
@@ -185,17 +187,17 @@ public class ScheduledTasksTest {
doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
- doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), anyLong(), notNull(), notNull());
doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
- StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
+ StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
.expectNextCount(noOfFiles) //
.expectComplete() //
.verify(); //
assertEquals(0, testedObject.getCurrentNumberOfTasks());
verify(consumerMock, times(1)).execute();
- verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull(), notNull());
verify(dataRouterMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull(), any());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
@@ -215,19 +217,19 @@ public class ScheduledTasksTest {
// First file collect will fail, 3 will succeed
doReturn(error, collectedFile, collectedFile, collectedFile) //
.when(fileCollectorMock) //
- .execute(any(FileData.class), any(MessageMetaData.class), anyLong(), any(Duration.class), any());
+ .execute(any(FileData.class), anyLong(), any(Duration.class), notNull());
doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
- StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
+ StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
.expectNextCount(3) //
.expectComplete() //
.verify(); //
assertEquals(0, testedObject.getCurrentNumberOfTasks());
verify(consumerMock, times(1)).execute();
- verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ verify(fileCollectorMock, times(4)).execute(notNull(), anyLong(), notNull(), notNull());
verify(dataRouterMock, times(3)).execute(notNull(), anyLong(), notNull(), any());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
@@ -243,7 +245,7 @@ public class ScheduledTasksTest {
doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
- doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), anyLong(), notNull(), notNull());
Mono<Object> error = Mono.error(new Exception("problem"));
// One publish will fail, the rest will succeed
@@ -251,14 +253,14 @@ public class ScheduledTasksTest {
.when(dataRouterMock) //
.execute(notNull(), anyLong(), notNull(), any());
- StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
+ StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
.expectNextCount(3) // 3 completed files
.expectComplete() //
.verify(); //
assertEquals(0, testedObject.getCurrentNumberOfTasks());
verify(consumerMock, times(1)).execute();
- verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ verify(fileCollectorMock, times(4)).execute(notNull(), anyLong(), notNull(), notNull());
verify(dataRouterMock, times(4)).execute(notNull(), anyLong(), notNull(), any());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
@@ -277,17 +279,17 @@ public class ScheduledTasksTest {
doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
- doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), anyLong(), notNull(), notNull());
doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
- StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
+ StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
.expectNextCount(1) // 99 is skipped
.expectComplete() //
.verify(); //
assertEquals(0, testedObject.getCurrentNumberOfTasks());
verify(consumerMock, times(1)).execute();
- verify(fileCollectorMock, times(1)).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ verify(fileCollectorMock, times(1)).execute(notNull(), anyLong(), notNull(), notNull());
verify(dataRouterMock, times(1)).execute(notNull(), anyLong(), notNull(), any());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);