aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/main/java')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java3
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java6
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java14
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java52
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java11
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java108
6 files changed, 120 insertions, 74 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
index 27df49f2..c1b4c0dc 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
@@ -24,6 +24,7 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonPrimitive;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
+
import java.lang.reflect.Type;
import java.nio.file.Path;
@@ -43,7 +44,7 @@ public class CommonFunctions {
*
* @param filePublishInformation info to serialize.
*
- * @return a string with the serialized model.
+ * @return a string with the serialized info.
*/
public static String createJsonBody(FilePublishInformation filePublishInformation) {
return gson.toJson(filePublishInformation);
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java
index 45302423..066402b2 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java
@@ -17,7 +17,10 @@
package org.onap.dcaegen2.collectors.datafile.model;
import com.google.gson.annotations.SerializedName;
+
import java.nio.file.Path;
+import java.util.Map;
+
import org.immutables.gson.Gson;
import org.immutables.value.Value;
import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel;
@@ -68,4 +71,7 @@ public interface FilePublishInformation extends DmaapModel {
@SerializedName("fileFormatVersion")
String getFileFormatVersion();
+
+ @SerializedName("context")
+ Map<String, String> getContext();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java
index 2643eea5..86ea5c3e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java
@@ -18,6 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.model.logging;
import java.util.Map;
import java.util.UUID;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.methods.HttpRequestBase;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables;
@@ -84,4 +85,17 @@ public final class MappedDiagnosticContext {
MDC.put(MdcVariables.REQUEST_ID, UUID.randomUUID().toString());
return MDC.getCopyOfContextMap();
}
+
+ /**
+ * Updates the request ID in the current context.
+ * @param newRequestId the new value of the request ID
+ * @return a copy the updated context
+ */
+ public static Map<String, String> setRequestId(String newRequestId) {
+ Map<String, String> context = MDC.getCopyOfContextMap();
+ context.put(MdcVariables.REQUEST_ID, newRequestId);
+ MDC.setContextMap(context);
+ return context;
+ }
+
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index ad03170d..05f04b30 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -22,12 +22,13 @@ package org.onap.dcaegen2.collectors.datafile.tasks;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
+
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
-import java.util.Map;
+
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPut;
@@ -45,6 +46,7 @@ import org.slf4j.MDC;
import org.springframework.core.io.FileSystemResource;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
+
import reactor.core.publisher.Mono;
/**
@@ -72,56 +74,55 @@ public class DataRouterPublisher {
/**
* Publish one file.
*
- * @param model information about the file to publish
+ * @param publishInfo information about the file to publish
* @param numRetries the maximal number of retries if the publishing fails
* @param firstBackoff the time to delay the first retry
- * @param contextMap tracing context variables
* @return the (same) filePublishInformation
*/
- public Mono<FilePublishInformation> publishFile(FilePublishInformation model, long numRetries,
- Duration firstBackoff, Map<String, String> contextMap) {
- MDC.setContextMap(contextMap);
- logger.trace("publishFile called with arg {}", model);
+ public Mono<FilePublishInformation> publishFile(FilePublishInformation publishInfo, long numRetries,
+ Duration firstBackoff) {
+ MDC.setContextMap(publishInfo.getContext());
+ logger.trace("publishFile called with arg {}", publishInfo);
dmaapProducerReactiveHttpClient = resolveClient();
- return Mono.just(model) //
+ return Mono.just(publishInfo) //
.cache() //
- .flatMap(m -> publishFile(m, contextMap)) //
- .flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap)) //
+ .flatMap(this::publishFile) //
+ .flatMap(httpStatus -> handleHttpResponse(httpStatus, publishInfo)) //
.retryBackoff(numRetries, firstBackoff);
}
- private Mono<HttpStatus> publishFile(FilePublishInformation filePublishInformation,
- Map<String, String> contextMap) {
- logger.trace("Entering publishFile with {}", filePublishInformation);
+ private Mono<HttpStatus> publishFile(FilePublishInformation publishInfo
+ ) {
+ logger.trace("Entering publishFile with {}", publishInfo);
try {
HttpPut put = new HttpPut();
- prepareHead(filePublishInformation, put);
- prepareBody(filePublishInformation, put);
+ prepareHead(publishInfo, put);
+ prepareBody(publishInfo, put);
dmaapProducerReactiveHttpClient.addUserCredentialsToHead(put);
HttpResponse response =
- dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, contextMap);
+ dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext());
logger.trace("{}", response);
return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
} catch (Exception e) {
- logger.warn("Unable to send file to DataRouter. Data: {}", filePublishInformation.getInternalLocation(), e);
+ logger.warn("Unable to send file to DataRouter. Data: {}", publishInfo.getInternalLocation(), e);
return Mono.error(e);
}
}
- private void prepareHead(FilePublishInformation model, HttpPut put) {
+ private void prepareHead(FilePublishInformation publishInfo, HttpPut put) {
put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE);
- JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
+ JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(publishInfo));
metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
put.addHeader(X_DMAAP_DR_META, metaData.toString());
- put.setURI(getPublishUri(model.getName()));
+ put.setURI(getPublishUri(publishInfo.getName()));
MappedDiagnosticContext.appendTraceInfo(put);
}
- private void prepareBody(FilePublishInformation model, HttpPut put) throws IOException {
- Path fileLocation = model.getInternalLocation();
+ private void prepareBody(FilePublishInformation publishInfo, HttpPut put) throws IOException {
+ Path fileLocation = publishInfo.getInternalLocation();
try (InputStream fileInputStream = createInputStream(fileLocation)) {
put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
}
@@ -134,12 +135,11 @@ public class DataRouterPublisher {
.pathSegment(fileName).build();
}
- private Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation model,
- Map<String, String> contextMap) {
- MDC.setContextMap(contextMap);
+ private Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation publishInfo) {
+ MDC.setContextMap(publishInfo.getContext());
if (HttpUtils.isSuccessfulResponseCode(response.value())) {
logger.trace("Publish to DR successful!");
- return Mono.just(model);
+ return Mono.just(publishInfo);
} else {
logger.warn("Publish to DR unsuccessful, response code: {}", response);
return Mono.error(new Exception("Publish to DR unsuccessful, response code: " + response));
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index cb93df1e..6f3f6b72 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -20,6 +20,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Map;
+
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
@@ -33,6 +34,7 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
+
import reactor.core.publisher.Mono;
/**
@@ -75,8 +77,8 @@ public class FileCollector {
.retryBackoff(numRetries, firstBackoff);
}
- private Mono<FilePublishInformation> collectFile(FileData fileData, Map<String, String> contextMap) {
- MDC.setContextMap(contextMap);
+ private Mono<FilePublishInformation> collectFile(FileData fileData, Map<String, String> context) {
+ MDC.setContextMap(context);
logger.trace("starting to collectFile {}", fileData.name());
final String remoteFile = fileData.remoteFilePath();
@@ -86,7 +88,7 @@ public class FileCollector {
currentClient.open();
localFile.getParent().toFile().mkdir(); // Create parent directories
currentClient.collectFile(remoteFile, localFile);
- return Mono.just(getFilePublishInformation(fileData, localFile));
+ return Mono.just(getFilePublishInformation(fileData, localFile, context));
} catch (Exception throwable) {
logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
throwable.toString());
@@ -105,7 +107,7 @@ public class FileCollector {
}
}
- private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile) {
+ private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile,Map<String, String> context) {
String location = fileData.location();
MessageMetaData metaData = fileData.messageMetaData();
return ImmutableFilePublishInformation.builder() //
@@ -121,6 +123,7 @@ public class FileCollector {
.compression(fileData.compression()) //
.fileFormatType(fileData.fileFormatType()) //
.fileFormatVersion(fileData.fileFormatVersion()) //
+ .context(context) //
.build();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 364fa040..5cc894c3 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -40,6 +40,7 @@ import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
+
/**
* This implements the main flow of the data file collector. Fetch file ready events from the
* message router, fetch new files from the PNF publish these in the data router.
@@ -89,7 +90,7 @@ public class ScheduledTasks {
logger.trace("Execution of tasks was registered");
applicationConfiguration.loadConfigurationFromFile();
createMainTask(context) //
- .subscribe(model -> onSuccess(model, context), //
+ .subscribe(this::onSuccess, //
throwable -> {
onError(throwable, context);
currentNumberOfSubscriptions.decrementAndGet();
@@ -103,18 +104,30 @@ public class ScheduledTasks {
}
}
- Flux<FilePublishInformation> createMainTask(Map<String, String> contextMap) {
+ Flux<FilePublishInformation> createMainTask(Map<String, String> context) {
return fetchMoreFileReadyMessages() //
- .parallel(NUMBER_OF_WORKER_THREADS / 2) // Each message in parallel
+ .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread
.runOn(scheduler) //
.flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) //
.doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
- .filter(fileData -> shouldBePublished(fileData, contextMap)) //
- .flatMap(fileData -> fetchFile(fileData, contextMap), false, 1, 1) //
- .flatMap(model -> publishToDataRouter(model, contextMap), false, 1, 1) //
- .doOnNext(model -> deleteFile(model.getInternalLocation(), contextMap)) //
- .doOnNext(model -> currentNumberOfTasks.decrementAndGet()) //
+ .flatMap(fileData -> createMdcContext(fileData, context)) //
+ .filter(this::shouldBePublished) //
+ .flatMap(this::fetchFile, false, 1, 1) //
+ .flatMap(this::publishToDataRouter,false, 1, 1) //
+ .doOnNext(publishInfo -> deleteFile(publishInfo.getInternalLocation(), publishInfo.getContext())) //
+ .doOnNext(publishInfo -> currentNumberOfTasks.decrementAndGet()) //
.sequential();
+
+ }
+
+ private class FileDataWithContext {
+ FileDataWithContext(FileData fileData, Map<String, String> context) {
+ this.fileData = fileData;
+ this.context = context;
+ }
+
+ final FileData fileData;
+ final Map<String, String> context;
}
/**
@@ -136,6 +149,10 @@ public class ScheduledTasks {
return publishedFilesCache.size();
}
+ public int getCurrentNumberOfSubscriptions() {
+ return currentNumberOfSubscriptions.get();
+ }
+
protected DMaaPMessageConsumer createConsumerTask() {
return new DMaaPMessageConsumer(this.applicationConfiguration);
}
@@ -153,21 +170,28 @@ public class ScheduledTasks {
logger.trace("Datafile tasks have been completed");
}
- private synchronized void onSuccess(FilePublishInformation model, Map<String, String> contextMap) {
- MDC.setContextMap(contextMap);
- logger.info("Datafile file published {}", model.getInternalLocation());
+ private synchronized void onSuccess(FilePublishInformation publishInfo) {
+ MDC.setContextMap(publishInfo.getContext());
+ logger.info("Datafile file published {}", publishInfo.getInternalLocation());
}
- private void onError(Throwable throwable, Map<String, String> contextMap) {
- MDC.setContextMap(contextMap);
+ private void onError(Throwable throwable, Map<String, String> context) {
+ MDC.setContextMap(context);
logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable.toString());
}
- private boolean shouldBePublished(FileData fileData, Map<String, String> contextMap) {
+ private Mono<FileDataWithContext> createMdcContext(FileData fileData, Map<String, String> context) {
+ MDC.setContextMap(context);
+ context = MappedDiagnosticContext.setRequestId(fileData.name());
+ FileDataWithContext pair = new FileDataWithContext(fileData, context);
+ return Mono.just(pair);
+ }
+
+ private boolean shouldBePublished(FileDataWithContext fileData) {
boolean result = false;
- Path localFilePath = fileData.getLocalFilePath();
+ Path localFilePath = fileData.fileData.getLocalFilePath();
if (publishedFilesCache.put(localFilePath) == null) {
- result = !createPublishedChecker().isFilePublished(fileData.name(), contextMap);
+ result = !createPublishedChecker().isFilePublished(fileData.fileData.name(), fileData.context);
}
if (!result) {
currentNumberOfTasks.decrementAndGet();
@@ -176,38 +200,36 @@ public class ScheduledTasks {
return result;
}
- private Mono<FilePublishInformation> fetchFile(FileData fileData, Map<String, String> contextMap) {
- MDC.setContextMap(contextMap);
- return createFileCollector()
- .collectFile(fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, contextMap)
- .onErrorResume(exception -> handleFetchFileFailure(fileData, contextMap));
+ private Mono<FilePublishInformation> fetchFile(FileDataWithContext fileData) {
+ MDC.setContextMap(fileData.context);
+ return createFileCollector().collectFile(fileData.fileData, FILE_TRANSFER_MAX_RETRIES,
+ FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, fileData.context)
+ .onErrorResume(exception -> handleFetchFileFailure(fileData));
}
- private Mono<FilePublishInformation> handleFetchFileFailure(FileData fileData, Map<String, String> contextMap) {
- MDC.setContextMap(contextMap);
- Path localFilePath = fileData.getLocalFilePath();
- logger.error("File fetching failed, fileData {}", fileData);
- deleteFile(localFilePath, contextMap);
+ private Mono<FilePublishInformation> handleFetchFileFailure(FileDataWithContext fileData) {
+ MDC.setContextMap(fileData.context);
+ Path localFilePath = fileData.fileData.getLocalFilePath();
+ logger.error("File fetching failed, fileData {}", fileData.fileData);
+ deleteFile(localFilePath, fileData.context);
publishedFilesCache.remove(localFilePath);
currentNumberOfTasks.decrementAndGet();
return Mono.empty();
}
- private Mono<FilePublishInformation> publishToDataRouter(FilePublishInformation model,
- Map<String, String> contextMap) {
- MDC.setContextMap(contextMap);
+ private Mono<FilePublishInformation> publishToDataRouter(FilePublishInformation publishInfo) {
+ MDC.setContextMap(publishInfo.getContext());
return createDataRouterPublisher()
- .publishFile(model, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT, contextMap)
- .onErrorResume(exception -> handlePublishFailure(model, contextMap));
+ .publishFile(publishInfo, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT)
+ .onErrorResume(exception -> handlePublishFailure(publishInfo));
}
- private Mono<FilePublishInformation> handlePublishFailure(FilePublishInformation model,
- Map<String, String> contextMap) {
- MDC.setContextMap(contextMap);
- logger.error("File publishing failed: {}", model);
- Path internalFileName = model.getInternalLocation();
- deleteFile(internalFileName, contextMap);
+ private Mono<FilePublishInformation> handlePublishFailure(FilePublishInformation publishInfo) {
+ MDC.setContextMap(publishInfo.getContext());
+ logger.error("File publishing failed: {}", publishInfo);
+ Path internalFileName = publishInfo.getInternalLocation();
+ deleteFile(internalFileName, publishInfo.getContext());
publishedFilesCache.remove(internalFileName);
currentNumberOfTasks.decrementAndGet();
return Mono.empty();
@@ -221,21 +243,21 @@ public class ScheduledTasks {
"Consuming new file ready messages, current number of tasks: {}, published files: {}, number of subscrptions: {}",
getCurrentNumberOfTasks(), publishedFilesCache.size(), this.currentNumberOfSubscriptions.get());
- Map<String, String> contextMap = MDC.getCopyOfContextMap();
+ Map<String, String> context = MDC.getCopyOfContextMap();
return createConsumerTask() //
.getMessageRouterResponse() //
- .onErrorResume(exception -> handleConsumeMessageFailure(exception, contextMap));
+ .onErrorResume(exception -> handleConsumeMessageFailure(exception, context));
}
- private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> contextMap) {
- MDC.setContextMap(contextMap);
+ private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> context) {
+ MDC.setContextMap(context);
logger.error("Polling for file ready message failed, exception: {}, config: {}", exception.toString(),
this.applicationConfiguration.getDmaapConsumerConfiguration());
return Flux.empty();
}
- private void deleteFile(Path localFile, Map<String, String> contextMap) {
- MDC.setContextMap(contextMap);
+ private void deleteFile(Path localFile, Map<String, String> context) {
+ MDC.setContextMap(context);
logger.trace("Deleting file: {}", localFile);
try {
Files.delete(localFile);