summaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/main')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java6
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java5
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java31
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java27
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java36
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java28
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java7
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java33
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java11
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java156
11 files changed, 144 insertions, 200 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index f89a1012..a30d2826 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -112,10 +112,8 @@ public class AppConfig {
setConfiguration(consumerConfiguration, publisherConfiguration, ftpesConfig);
}
- } catch (IOException e) {
- logger.error("Problem with file loading, file: {}", filepath, e);
- } catch (JsonSyntaxException e) {
- logger.error("Problem with Json deserialization", e);
+ } catch (JsonSyntaxException | IOException e) {
+ logger.error("Problem with loading configuration, file: {}", filepath, e);
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
index 52723330..d5c8b3b2 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
@@ -63,7 +63,7 @@ public class SchedulerConfig {
private static final Logger logger = LoggerFactory.getLogger(SchedulerConfig.class);
private static final Marker ENTRY = MarkerFactory.getMarker("ENTRY");
private static final Marker EXIT = MarkerFactory.getMarker("EXIT");
- private static volatile List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
+ private static List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
private Map<String, String> contextMap;
private final TaskScheduler taskScheduler;
@@ -97,8 +97,7 @@ public class SchedulerConfig {
MdcVariables.setMdcContextMap(contextMap);
logger.info(EXIT, "Stopped Datafile workflow");
MDC.clear();
- return Mono.defer(() -> Mono
- .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED)));
+ return Mono.just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED));
}
/**
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java
deleted file mode 100644
index 36279016..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.exceptions;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
- */
-public class DmaapNotFoundException extends DatafileTaskException {
-
- private static final long serialVersionUID = 1L;
-
- public DmaapNotFoundException(String message) {
- super(message);
- }
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
index bdb47b2b..d0d1f91a 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
@@ -37,10 +37,19 @@ import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
public abstract class FileData {
private static final String DATAFILE_TMPDIR = "/tmp/onap_datafile/";
+ /**
+ * @return the file name with no path
+ */
public abstract String name();
+ /**
+ * @return the URL to use to fetch the file from the PNF
+ */
public abstract String location();
+ /**
+ * @return the file transfer protocol to use for fetching the file
+ */
public abstract Scheme scheme();
public abstract String compression();
@@ -49,17 +58,25 @@ public abstract class FileData {
public abstract String fileFormatVersion();
+ public abstract MessageMetaData messageMetaData();
+
+ /**
+ * @return the name of the PNF, must be unique in the network
+ */
+ public String sourceName() {
+ return messageMetaData().sourceName();
+ }
+
public String remoteFilePath() {
return URI.create(location()).getPath();
}
public Path getLocalFileName() {
- URI uri = URI.create(location());
- return createLocalFileName(uri.getHost(), name());
+ return createLocalFileName(messageMetaData().sourceName(), name());
}
- public static Path createLocalFileName(String host, String fileName) {
- return Paths.get(DATAFILE_TMPDIR, host + "_" + fileName);
+ public static Path createLocalFileName(String sourceName, String fileName) {
+ return Paths.get(DATAFILE_TMPDIR, sourceName + "_" + fileName);
}
public FileServerData fileServerData() {
@@ -86,4 +103,4 @@ public abstract class FileData {
}
return Optional.empty();
}
-} \ No newline at end of file
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
index e3293faa..9373a4f2 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
@@ -31,9 +31,5 @@ import org.immutables.value.Value;
@Value.Immutable
@Gson.TypeAdapters
public interface FileReadyMessage {
- public String pnfName();
-
- public MessageMetaData messageMetaData();
-
public List<FileData> files();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index af45cc99..a3595ecf 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -100,9 +100,13 @@ public class JsonMessageParser {
Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
JsonParser jsonParser = new JsonParser();
- return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
- : element.isJsonObject() ? Optional.of((JsonObject) element)
- : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
+ if (element.isJsonPrimitive()) {
+ return Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject());
+ } else if (element.isJsonObject()) {
+ return Optional.of((JsonObject) element);
+ } else {
+ return Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
+ }
}
private Flux<FileReadyMessage> getMessagesFromJsonArray(JsonElement jsonElement) {
@@ -133,18 +137,17 @@ public class JsonMessageParser {
: logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject));
}
+
private Mono<FileReadyMessage> transformMessages(JsonObject message) {
Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message);
if (optionalMessageMetaData.isPresent()) {
+ MessageMetaData messageMetaData = optionalMessageMetaData.get();
JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
if (arrayOfNamedHashMap != null) {
- List<FileData> allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap);
+ List<FileData> allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap, messageMetaData);
if (!allFileDataFromJson.isEmpty()) {
- MessageMetaData messageMetaData = optionalMessageMetaData.get();
return Mono.just(ImmutableFileReadyMessage.builder() //
- .pnfName(messageMetaData.sourceName()) //
- .messageMetaData(messageMetaData) //
.files(allFileDataFromJson) //
.build());
} else {
@@ -152,13 +155,14 @@ public class JsonMessageParser {
}
}
- logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message);
+ logger.error("VES event parsing. Missing arrayOfNamedHashMap in message. {}", message);
return Mono.empty();
}
- logger.error("Unable to collect file from xNF. FileReady event has incorrect JsonObject. {}", message);
+ logger.error("VES event parsing. FileReady event has incorrect JsonObject. {}", message);
return Mono.empty();
}
+
private Optional<MessageMetaData> getMessageMetaData(JsonObject message) {
List<String> missingValues = new ArrayList<>();
JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER);
@@ -185,7 +189,7 @@ public class JsonMessageParser {
if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) {
return Optional.of(messageMetaData);
} else {
- String errorMessage = "Unable to collect file from xNF.";
+ String errorMessage = "VES event parsing.";
if (!missingValues.isEmpty()) {
errorMessage += " Missing data: " + missingValues;
}
@@ -206,11 +210,11 @@ public class JsonMessageParser {
return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier);
}
- private List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields) {
+ private List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields, MessageMetaData messageMetaData) {
List<FileData> res = new ArrayList<>();
for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
- Optional<FileData> fileData = getFileDataFromJson(fileInfo);
+ Optional<FileData> fileData = getFileDataFromJson(fileInfo, messageMetaData);
if (fileData.isPresent()) {
res.add(fileData.get());
@@ -219,7 +223,7 @@ public class JsonMessageParser {
return res;
}
- private Optional<FileData> getFileDataFromJson(JsonObject fileInfo) {
+ private Optional<FileData> getFileDataFromJson(JsonObject fileInfo, MessageMetaData messageMetaData) {
logger.trace("starting to getFileDataFromJson!");
List<String> missingValues = new ArrayList<>();
@@ -230,7 +234,7 @@ public class JsonMessageParser {
try {
scheme = Scheme.getSchemeFromString(URI.create(location).getScheme());
} catch (Exception e) {
- logger.error("Unable to collect file from xNF.", e);
+ logger.error("VES event parsing.", e);
return Optional.empty();
}
FileData fileData = ImmutableFileData.builder() //
@@ -240,12 +244,12 @@ public class JsonMessageParser {
.location(location) //
.scheme(scheme) //
.compression(getValueFromJson(data, COMPRESSION, missingValues)) //
+ .messageMetaData(messageMetaData)
.build();
if (missingValues.isEmpty()) {
return Optional.of(fileData);
}
- logger.error("Unable to collect file from xNF. File information wrong. Missing data: {} Data: {}",
- missingValues, fileInfo);
+ logger.error("VES event parsing. File information wrong. Missing data: {} Data: {}", missingValues, fileInfo);
return Optional.empty();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java
index f6daf733..49e2f01e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java
@@ -30,8 +30,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consume
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.reactive.function.client.WebClient;
-import reactor.core.publisher.Mono;
+
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
@@ -39,25 +40,21 @@ import reactor.core.publisher.Flux;
public class DMaaPMessageConsumerTask {
private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumerTask.class);
- private AppConfig datafileAppConfig;
- private JsonMessageParser jsonMessageParser;
- private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
+ private final JsonMessageParser jsonMessageParser;
+ private final DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
public DMaaPMessageConsumerTask(AppConfig datafileAppConfig) {
- this.datafileAppConfig = datafileAppConfig;
this.jsonMessageParser = new JsonMessageParser();
+ this.dmaaPConsumerReactiveHttpClient = createHttpClient(datafileAppConfig);
}
- protected DMaaPMessageConsumerTask(AppConfig datafileAppConfig,
- DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
+ protected DMaaPMessageConsumerTask(DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
JsonMessageParser messageParser) {
- this.datafileAppConfig = datafileAppConfig;
this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient;
this.jsonMessageParser = messageParser;
}
public Flux<FileReadyMessage> execute() {
- dmaaPConsumerReactiveHttpClient = resolveClient();
logger.trace("execute called");
return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()));
}
@@ -67,15 +64,10 @@ public class DMaaPMessageConsumerTask {
return jsonMessageParser.getMessagesFromJson(message);
}
- protected DmaapConsumerConfiguration resolveConfiguration() {
- return datafileAppConfig.getDmaapConsumerConfiguration();
+ private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig) {
+ DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration();
+ WebClient client = new DmaapReactiveWebClient().fromConfiguration(config).build();
+ return new DMaaPConsumerReactiveHttpClient(config, client);
}
- protected DMaaPConsumerReactiveHttpClient resolveClient() {
- return new DMaaPConsumerReactiveHttpClient(resolveConfiguration(), buildWebClient());
- }
-
- protected WebClient buildWebClient() {
- return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
- }
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index 0fef9ab4..4207d1fc 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -84,12 +84,13 @@ public class DataRouterPublisher {
* @param model information about the file to publish
* @param numRetries the maximal number of retries if the publishing fails
* @param firstBackoff the time to delay the first retry
- * @return the HTTP response status as a string
+ * @param contextMap tracing context variables
+ * @return the (same) ConsumerDmaapModel
*/
public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff,
Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
- logger.trace("Method called with arg {}", model);
+ logger.trace("Publish called with arg {}", model);
dmaapProducerReactiveHttpClient = resolveClient();
return Mono.just(model)
@@ -117,7 +118,7 @@ public class DataRouterPublisher {
logger.trace(response.toString());
return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
} catch (Exception e) {
- logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e);
+ logger.warn("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e);
return Mono.error(e);
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index fb27a579..8849b45e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -49,31 +49,31 @@ public class FileCollector {
this.datafileAppConfig = datafileAppConfig;
}
- public Mono<ConsumerDmaapModel> execute(FileData fileData, MessageMetaData metaData, long maxNumberOfRetries,
- Duration firstBackoffTimeout, Map<String, String> contextMap) {
+ public Mono<ConsumerDmaapModel> execute(FileData fileData, long maxNumberOfRetries, Duration firstBackoffTimeout,
+ Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
logger.trace("Entering execute with {}", fileData);
return Mono.just(fileData) //
.cache() //
- .flatMap(fd -> collectFile(fileData, metaData, contextMap)) //
+ .flatMap(fd -> collectFile(fileData, contextMap)) //
.retryBackoff(maxNumberOfRetries, firstBackoffTimeout);
}
- private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData,
- Map<String, String> contextMap) {
+ private Mono<ConsumerDmaapModel> collectFile(FileData fileData, Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
- logger.trace("starting to collectFile");
+ logger.trace("starting to collectFile {}", fileData.name());
final String remoteFile = fileData.remoteFilePath();
final Path localFile = fileData.getLocalFileName();
try (FileCollectClient currentClient = createClient(fileData)) {
+ currentClient.open();
localFile.getParent().toFile().mkdir(); // Create parent directories
currentClient.collectFile(remoteFile, localFile);
- return Mono.just(getConsumerDmaapModel(fileData, metaData, localFile));
+ return Mono.just(getConsumerDmaapModel(fileData, localFile));
} catch (Exception throwable) {
- logger.warn("Failed to download file: {}, reason: {}", fileData.name(), throwable);
+ logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(), throwable.toString());
return Mono.error(throwable);
}
}
@@ -89,9 +89,9 @@ public class FileCollector {
}
}
- private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, MessageMetaData metaData, Path localFile) {
+ private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, Path localFile) {
String location = fileData.location();
-
+ MessageMetaData metaData = fileData.messageMetaData();
return ImmutableConsumerDmaapModel.builder() //
.productName(metaData.productName()) //
.vendorName(metaData.vendorName()) //
@@ -108,16 +108,13 @@ public class FileCollector {
.build();
}
- SftpClient createSftpClient(FileData fileData) throws DatafileTaskException {
- SftpClient client = new SftpClient(fileData.fileServerData());
- client.open();
- return client;
+ protected SftpClient createSftpClient(FileData fileData) {
+ return new SftpClient(fileData.fileServerData());
}
- FtpsClient createFtpsClient(FileData fileData) throws DatafileTaskException {
+ protected FtpsClient createFtpsClient(FileData fileData) {
FtpesConfig config = datafileAppConfig.getFtpesConfiguration();
- FtpsClient client = new FtpsClient(fileData.fileServerData());
- client.open(config.keyCert(), config.keyPassword(), Paths.get(config.trustedCA()), config.trustedCAPassword());
- return client;
+ return new FtpsClient(fileData.fileServerData(), config.keyCert(), config.keyPassword(),
+ Paths.get(config.trustedCA()), config.trustedCAPassword());
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
index 0729caa0..41f8e3cd 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
@@ -54,7 +54,7 @@ public class PublishedChecker {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
- private AppConfig appConfig;
+ private final AppConfig appConfig;
/**
* Constructor.
@@ -88,12 +88,13 @@ public class PublishedChecker {
HttpResponse response =
producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest, 2000, contextMap);
- logger.trace(response.toString());
+ logger.trace("{}", response);
int status = response.getStatusLine().getStatusCode();
HttpEntity entity = response.getEntity();
- InputStream content = entity.getContent();
- String body = IOUtils.toString(content);
- return HttpStatus.SC_OK == status && !"[]".equals(body);
+ try (InputStream content = entity.getContent()) {
+ String body = IOUtils.toString(content);
+ return HttpStatus.SC_OK == status && !"[]".equals(body);
+ }
} catch (Exception e) {
logger.warn("Unable to check if file has been published.", e);
return false;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index d41e5c25..b4096c73 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -20,8 +20,6 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@@ -29,7 +27,6 @@ import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
-import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.collectors.datafile.service.PublishedFileCache;
import org.slf4j.Logger;
@@ -44,32 +41,24 @@ import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
/**
- * This implements the main flow of the data file collector. Fetch file ready events from the message router, fetch new
- * files from the PNF publish these in the data router.
+ * This implements the main flow of the data file collector. Fetch file ready events from the
+ * message router, fetch new files from the PNF publish these in the data router.
*/
@Component
public class ScheduledTasks {
- private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200;
- private static final int MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS = 10;
-
- /** Data needed for fetching of one file. */
- private class FileCollectionData {
- final FileData fileData;
- final MessageMetaData metaData;
-
- FileCollectionData(FileData fd, MessageMetaData metaData) {
- this.fileData = fd;
- this.metaData = metaData;
- }
- }
+ private static final int NUMBER_OF_WORKER_THREADS = 100;
+ private static final int MAX_TASKS_FOR_POLLING = 50;
+ private static final long DATA_ROUTER_MAX_RETRIES = 5;
+ private static final Duration DATA_ROUTER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(2);
+ private static final long FILE_TRANSFER_MAX_RETRIES = 3;
+ private static final Duration FILE_TRANSFER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(5);
private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
private final AppConfig applicationConfiguration;
private final AtomicInteger currentNumberOfTasks = new AtomicInteger();
- private final Scheduler scheduler =
- Schedulers.newElastic("DataFileCollector", MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS);
+ private final Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS);
PublishedFileCache alreadyPublishedFiles = new PublishedFileCache();
/**
@@ -86,21 +75,25 @@ public class ScheduledTasks {
* Main function for scheduling for the file collection Workflow.
*/
public void scheduleMainDatafileEventTask(Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
- logger.trace("Execution of tasks was registered");
- applicationConfiguration.loadConfigurationFromFile();
- createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap),
- () -> onComplete(contextMap));
+ try {
+ MdcVariables.setMdcContextMap(contextMap);
+ logger.trace("Execution of tasks was registered");
+ applicationConfiguration.loadConfigurationFromFile();
+ createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap),
+ () -> onComplete(contextMap));
+ } catch (Exception e) {
+ logger.error("Unexpected exception: ", e);
+ }
}
Flux<ConsumerDmaapModel> createMainTask(Map<String, String> contextMap) {
return fetchMoreFileReadyMessages() //
- .parallel(getParallelism()) // Each FileReadyMessage in a separate thread
+ .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread
.runOn(scheduler) //
- .flatMap(this::createFileCollectionTask) //
+ .flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) //
.filter(fileData -> shouldBePublished(fileData, contextMap)) //
.doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
- .flatMap(fileData -> collectFileFromXnf(fileData, contextMap)) //
+ .flatMap(fileData -> fetchFile(fileData, contextMap)) //
.flatMap(model -> publishToDataRouter(model, contextMap)) //
.doOnNext(model -> deleteFile(model.getInternalLocation(), contextMap)) //
.doOnNext(model -> currentNumberOfTasks.decrementAndGet()) //
@@ -114,63 +107,61 @@ public class ScheduledTasks {
alreadyPublishedFiles.purge(now);
}
- private void onComplete(Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
- logger.info("Datafile tasks have been completed");
+ protected PublishedChecker createPublishedChecker() {
+ return new PublishedChecker(applicationConfiguration);
}
- private void onSuccess(ConsumerDmaapModel model, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
- logger.info("Datafile consumed tasks {}", model.getInternalLocation());
+ protected int getCurrentNumberOfTasks() {
+ return currentNumberOfTasks.get();
}
- private void onError(Throwable throwable, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
- logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable);
+ protected DMaaPMessageConsumerTask createConsumerTask() {
+ return new DMaaPMessageConsumerTask(this.applicationConfiguration);
}
- private int getParallelism() {
- if (MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks() > 0) {
- return MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks();
- } else {
- return 1; // We need at least one rail/thread
- }
+ protected FileCollector createFileCollector() {
+ return new FileCollector(applicationConfiguration);
+ }
+
+ protected DataRouterPublisher createDataRouterPublisher() {
+ return new DataRouterPublisher(applicationConfiguration);
}
- private Flux<FileCollectionData> createFileCollectionTask(FileReadyMessage availableFiles) {
- List<FileCollectionData> fileCollects = new ArrayList<>();
+ private void onComplete(Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
+ logger.trace("Datafile tasks have been completed");
+ }
- for (FileData fileData : availableFiles.files()) {
- fileCollects.add(new FileCollectionData(fileData, availableFiles.messageMetaData()));
- }
- return Flux.fromIterable(fileCollects);
+ private synchronized void onSuccess(ConsumerDmaapModel model, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
+ logger.info("Datafile file published {}", model.getInternalLocation());
}
- private boolean shouldBePublished(FileCollectionData task, Map<String, String> contextMap) {
+ private void onError(Throwable throwable, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
+ logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable.toString());
+ }
+
+ private boolean shouldBePublished(FileData fileData, Map<String, String> contextMap) {
boolean result = false;
- Path localFileName = task.fileData.getLocalFileName();
+ Path localFileName = fileData.getLocalFileName();
if (alreadyPublishedFiles.put(localFileName) == null) {
result = !createPublishedChecker().execute(localFileName.getFileName().toString(), contextMap);
}
return result;
}
- private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect,
- Map<String, String> contextMap) {
- final long maxNUmberOfRetries = 3;
- final Duration initialRetryTimeout = Duration.ofSeconds(5);
-
+ private Mono<ConsumerDmaapModel> fetchFile(FileData fileData, Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
return createFileCollector()
- .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout,
- contextMap)
- .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, contextMap));
+ .execute(fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, contextMap)
+ .onErrorResume(exception -> handleFetchFileFailure(fileData, contextMap));
}
- private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData, Map<String, String> contextMap) {
+ private Mono<ConsumerDmaapModel> handleFetchFileFailure(FileData fileData, Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
Path localFileName = fileData.getLocalFileName();
- logger.error("File fetching failed: {}", localFileName);
+ logger.error("File fetching failed, fileData {}", fileData);
deleteFile(localFileName, contextMap);
alreadyPublishedFiles.remove(localFileName);
currentNumberOfTasks.decrementAndGet();
@@ -178,19 +169,16 @@ public class ScheduledTasks {
}
private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model, Map<String, String> contextMap) {
- final long maxNumberOfRetries = 3;
- final Duration initialRetryTimeout = Duration.ofSeconds(5);
-
-
MdcVariables.setMdcContextMap(contextMap);
- return createDataRouterPublisher().execute(model, maxNumberOfRetries, initialRetryTimeout, contextMap)
- .onErrorResume(exception -> handlePublishFailure(model, exception, contextMap));
+
+ return createDataRouterPublisher()
+ .execute(model, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT, contextMap)
+ .onErrorResume(exception -> handlePublishFailure(model, contextMap));
}
- private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception,
- Map<String, String> contextMap) {
+ private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
- logger.error("File publishing failed: {}, exception: {}", model.getName(), exception);
+ logger.error("File publishing failed: {}", model);
Path internalFileName = model.getInternalLocation();
deleteFile(internalFileName, contextMap);
alreadyPublishedFiles.remove(internalFileName);
@@ -202,8 +190,9 @@ public class ScheduledTasks {
* Fetch more messages from the message router. This is done in a polling/blocking fashion.
*/
private Flux<FileReadyMessage> fetchMoreFileReadyMessages() {
- logger.trace("Consuming new file ready messages, current number of tasks: {}", getCurrentNumberOfTasks());
- if (getCurrentNumberOfTasks() > MAX_NUMBER_OF_CONCURRENT_TASKS) {
+ logger.info("Consuming new file ready messages, current number of tasks: {}", getCurrentNumberOfTasks());
+ if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING) {
+ logger.info("Skipping, current number of tasks: {}", getCurrentNumberOfTasks());
return Flux.empty();
}
@@ -215,7 +204,8 @@ public class ScheduledTasks {
private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
- logger.error("Polling for file ready message failed, exception: {}", exception);
+ logger.error("Polling for file ready message failed, exception: {}, config: {}", exception.toString(),
+ this.applicationConfiguration.getDmaapConsumerConfiguration());
return Flux.empty();
}
@@ -229,24 +219,4 @@ public class ScheduledTasks {
}
}
- PublishedChecker createPublishedChecker() {
- return new PublishedChecker(applicationConfiguration);
- }
-
- int getCurrentNumberOfTasks() {
- return currentNumberOfTasks.get();
- }
-
- DMaaPMessageConsumerTask createConsumerTask() {
- return new DMaaPMessageConsumerTask(this.applicationConfiguration);
- }
-
- FileCollector createFileCollector() {
- return new FileCollector(applicationConfiguration);
- }
-
- DataRouterPublisher createDataRouterPublisher() {
- return new DataRouterPublisher(applicationConfiguration);
- }
-
}