diff options
Diffstat (limited to 'datafile-app-server/src/main')
6 files changed, 120 insertions, 74 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java index 27df49f2..c1b4c0dc 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java @@ -24,6 +24,7 @@ import com.google.gson.JsonElement; import com.google.gson.JsonPrimitive; import com.google.gson.JsonSerializationContext; import com.google.gson.JsonSerializer; + import java.lang.reflect.Type; import java.nio.file.Path; @@ -43,7 +44,7 @@ public class CommonFunctions { * * @param filePublishInformation info to serialize. * - * @return a string with the serialized model. + * @return a string with the serialized info. */ public static String createJsonBody(FilePublishInformation filePublishInformation) { return gson.toJson(filePublishInformation); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java index 45302423..066402b2 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java @@ -17,7 +17,10 @@ package org.onap.dcaegen2.collectors.datafile.model; import com.google.gson.annotations.SerializedName; + import java.nio.file.Path; +import java.util.Map; + import org.immutables.gson.Gson; import org.immutables.value.Value; import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel; @@ -68,4 +71,7 @@ public interface FilePublishInformation extends DmaapModel { @SerializedName("fileFormatVersion") String getFileFormatVersion(); + + @SerializedName("context") + Map<String, String> getContext(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java index 2643eea5..86ea5c3e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java @@ -18,6 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.model.logging; import java.util.Map; import java.util.UUID; + import org.apache.commons.lang3.StringUtils; import org.apache.http.client.methods.HttpRequestBase; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables; @@ -84,4 +85,17 @@ public final class MappedDiagnosticContext { MDC.put(MdcVariables.REQUEST_ID, UUID.randomUUID().toString()); return MDC.getCopyOfContextMap(); } + + /** + * Updates the request ID in the current context. + * @param newRequestId the new value of the request ID + * @return a copy the updated context + */ + public static Map<String, String> setRequestId(String newRequestId) { + Map<String, String> context = MDC.getCopyOfContextMap(); + context.put(MdcVariables.REQUEST_ID, newRequestId); + MDC.setContextMap(context); + return context; + } + } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java index ad03170d..05f04b30 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -22,12 +22,13 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import com.google.gson.JsonElement; import com.google.gson.JsonParser; + import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.nio.file.Path; import java.time.Duration; -import java.util.Map; + import org.apache.commons.io.IOUtils; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpPut; @@ -45,6 +46,7 @@ import org.slf4j.MDC; import org.springframework.core.io.FileSystemResource; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; + import reactor.core.publisher.Mono; /** @@ -72,56 +74,55 @@ public class DataRouterPublisher { /** * Publish one file. * - * @param model information about the file to publish + * @param publishInfo information about the file to publish * @param numRetries the maximal number of retries if the publishing fails * @param firstBackoff the time to delay the first retry - * @param contextMap tracing context variables * @return the (same) filePublishInformation */ - public Mono<FilePublishInformation> publishFile(FilePublishInformation model, long numRetries, - Duration firstBackoff, Map<String, String> contextMap) { - MDC.setContextMap(contextMap); - logger.trace("publishFile called with arg {}", model); + public Mono<FilePublishInformation> publishFile(FilePublishInformation publishInfo, long numRetries, + Duration firstBackoff) { + MDC.setContextMap(publishInfo.getContext()); + logger.trace("publishFile called with arg {}", publishInfo); dmaapProducerReactiveHttpClient = resolveClient(); - return Mono.just(model) // + return Mono.just(publishInfo) // .cache() // - .flatMap(m -> publishFile(m, contextMap)) // - .flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap)) // + .flatMap(this::publishFile) // + .flatMap(httpStatus -> handleHttpResponse(httpStatus, publishInfo)) // .retryBackoff(numRetries, firstBackoff); } - private Mono<HttpStatus> publishFile(FilePublishInformation filePublishInformation, - Map<String, String> contextMap) { - logger.trace("Entering publishFile with {}", filePublishInformation); + private Mono<HttpStatus> publishFile(FilePublishInformation publishInfo + ) { + logger.trace("Entering publishFile with {}", publishInfo); try { HttpPut put = new HttpPut(); - prepareHead(filePublishInformation, put); - prepareBody(filePublishInformation, put); + prepareHead(publishInfo, put); + prepareBody(publishInfo, put); dmaapProducerReactiveHttpClient.addUserCredentialsToHead(put); HttpResponse response = - dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, contextMap); + dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext()); logger.trace("{}", response); return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode())); } catch (Exception e) { - logger.warn("Unable to send file to DataRouter. Data: {}", filePublishInformation.getInternalLocation(), e); + logger.warn("Unable to send file to DataRouter. Data: {}", publishInfo.getInternalLocation(), e); return Mono.error(e); } } - private void prepareHead(FilePublishInformation model, HttpPut put) { + private void prepareHead(FilePublishInformation publishInfo, HttpPut put) { put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE); - JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model)); + JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(publishInfo)); metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString(); metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG); put.addHeader(X_DMAAP_DR_META, metaData.toString()); - put.setURI(getPublishUri(model.getName())); + put.setURI(getPublishUri(publishInfo.getName())); MappedDiagnosticContext.appendTraceInfo(put); } - private void prepareBody(FilePublishInformation model, HttpPut put) throws IOException { - Path fileLocation = model.getInternalLocation(); + private void prepareBody(FilePublishInformation publishInfo, HttpPut put) throws IOException { + Path fileLocation = publishInfo.getInternalLocation(); try (InputStream fileInputStream = createInputStream(fileLocation)) { put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream))); } @@ -134,12 +135,11 @@ public class DataRouterPublisher { .pathSegment(fileName).build(); } - private Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation model, - Map<String, String> contextMap) { - MDC.setContextMap(contextMap); + private Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation publishInfo) { + MDC.setContextMap(publishInfo.getContext()); if (HttpUtils.isSuccessfulResponseCode(response.value())) { logger.trace("Publish to DR successful!"); - return Mono.just(model); + return Mono.just(publishInfo); } else { logger.warn("Publish to DR unsuccessful, response code: {}", response); return Mono.error(new Exception("Publish to DR unsuccessful, response code: " + response)); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java index cb93df1e..6f3f6b72 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -20,6 +20,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; import java.util.Map; + import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; @@ -33,6 +34,7 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; + import reactor.core.publisher.Mono; /** @@ -75,8 +77,8 @@ public class FileCollector { .retryBackoff(numRetries, firstBackoff); } - private Mono<FilePublishInformation> collectFile(FileData fileData, Map<String, String> contextMap) { - MDC.setContextMap(contextMap); + private Mono<FilePublishInformation> collectFile(FileData fileData, Map<String, String> context) { + MDC.setContextMap(context); logger.trace("starting to collectFile {}", fileData.name()); final String remoteFile = fileData.remoteFilePath(); @@ -86,7 +88,7 @@ public class FileCollector { currentClient.open(); localFile.getParent().toFile().mkdir(); // Create parent directories currentClient.collectFile(remoteFile, localFile); - return Mono.just(getFilePublishInformation(fileData, localFile)); + return Mono.just(getFilePublishInformation(fileData, localFile, context)); } catch (Exception throwable) { logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(), throwable.toString()); @@ -105,7 +107,7 @@ public class FileCollector { } } - private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile) { + private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile,Map<String, String> context) { String location = fileData.location(); MessageMetaData metaData = fileData.messageMetaData(); return ImmutableFilePublishInformation.builder() // @@ -121,6 +123,7 @@ public class FileCollector { .compression(fileData.compression()) // .fileFormatType(fileData.fileFormatType()) // .fileFormatVersion(fileData.fileFormatVersion()) // + .context(context) // .build(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 364fa040..5cc894c3 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -40,6 +40,7 @@ import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; + /** * This implements the main flow of the data file collector. Fetch file ready events from the * message router, fetch new files from the PNF publish these in the data router. @@ -89,7 +90,7 @@ public class ScheduledTasks { logger.trace("Execution of tasks was registered"); applicationConfiguration.loadConfigurationFromFile(); createMainTask(context) // - .subscribe(model -> onSuccess(model, context), // + .subscribe(this::onSuccess, // throwable -> { onError(throwable, context); currentNumberOfSubscriptions.decrementAndGet(); @@ -103,18 +104,30 @@ public class ScheduledTasks { } } - Flux<FilePublishInformation> createMainTask(Map<String, String> contextMap) { + Flux<FilePublishInformation> createMainTask(Map<String, String> context) { return fetchMoreFileReadyMessages() // - .parallel(NUMBER_OF_WORKER_THREADS / 2) // Each message in parallel + .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread .runOn(scheduler) // .flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) // .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) // - .filter(fileData -> shouldBePublished(fileData, contextMap)) // - .flatMap(fileData -> fetchFile(fileData, contextMap), false, 1, 1) // - .flatMap(model -> publishToDataRouter(model, contextMap), false, 1, 1) // - .doOnNext(model -> deleteFile(model.getInternalLocation(), contextMap)) // - .doOnNext(model -> currentNumberOfTasks.decrementAndGet()) // + .flatMap(fileData -> createMdcContext(fileData, context)) // + .filter(this::shouldBePublished) // + .flatMap(this::fetchFile, false, 1, 1) // + .flatMap(this::publishToDataRouter,false, 1, 1) // + .doOnNext(publishInfo -> deleteFile(publishInfo.getInternalLocation(), publishInfo.getContext())) // + .doOnNext(publishInfo -> currentNumberOfTasks.decrementAndGet()) // .sequential(); + + } + + private class FileDataWithContext { + FileDataWithContext(FileData fileData, Map<String, String> context) { + this.fileData = fileData; + this.context = context; + } + + final FileData fileData; + final Map<String, String> context; } /** @@ -136,6 +149,10 @@ public class ScheduledTasks { return publishedFilesCache.size(); } + public int getCurrentNumberOfSubscriptions() { + return currentNumberOfSubscriptions.get(); + } + protected DMaaPMessageConsumer createConsumerTask() { return new DMaaPMessageConsumer(this.applicationConfiguration); } @@ -153,21 +170,28 @@ public class ScheduledTasks { logger.trace("Datafile tasks have been completed"); } - private synchronized void onSuccess(FilePublishInformation model, Map<String, String> contextMap) { - MDC.setContextMap(contextMap); - logger.info("Datafile file published {}", model.getInternalLocation()); + private synchronized void onSuccess(FilePublishInformation publishInfo) { + MDC.setContextMap(publishInfo.getContext()); + logger.info("Datafile file published {}", publishInfo.getInternalLocation()); } - private void onError(Throwable throwable, Map<String, String> contextMap) { - MDC.setContextMap(contextMap); + private void onError(Throwable throwable, Map<String, String> context) { + MDC.setContextMap(context); logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable.toString()); } - private boolean shouldBePublished(FileData fileData, Map<String, String> contextMap) { + private Mono<FileDataWithContext> createMdcContext(FileData fileData, Map<String, String> context) { + MDC.setContextMap(context); + context = MappedDiagnosticContext.setRequestId(fileData.name()); + FileDataWithContext pair = new FileDataWithContext(fileData, context); + return Mono.just(pair); + } + + private boolean shouldBePublished(FileDataWithContext fileData) { boolean result = false; - Path localFilePath = fileData.getLocalFilePath(); + Path localFilePath = fileData.fileData.getLocalFilePath(); if (publishedFilesCache.put(localFilePath) == null) { - result = !createPublishedChecker().isFilePublished(fileData.name(), contextMap); + result = !createPublishedChecker().isFilePublished(fileData.fileData.name(), fileData.context); } if (!result) { currentNumberOfTasks.decrementAndGet(); @@ -176,38 +200,36 @@ public class ScheduledTasks { return result; } - private Mono<FilePublishInformation> fetchFile(FileData fileData, Map<String, String> contextMap) { - MDC.setContextMap(contextMap); - return createFileCollector() - .collectFile(fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, contextMap) - .onErrorResume(exception -> handleFetchFileFailure(fileData, contextMap)); + private Mono<FilePublishInformation> fetchFile(FileDataWithContext fileData) { + MDC.setContextMap(fileData.context); + return createFileCollector().collectFile(fileData.fileData, FILE_TRANSFER_MAX_RETRIES, + FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, fileData.context) + .onErrorResume(exception -> handleFetchFileFailure(fileData)); } - private Mono<FilePublishInformation> handleFetchFileFailure(FileData fileData, Map<String, String> contextMap) { - MDC.setContextMap(contextMap); - Path localFilePath = fileData.getLocalFilePath(); - logger.error("File fetching failed, fileData {}", fileData); - deleteFile(localFilePath, contextMap); + private Mono<FilePublishInformation> handleFetchFileFailure(FileDataWithContext fileData) { + MDC.setContextMap(fileData.context); + Path localFilePath = fileData.fileData.getLocalFilePath(); + logger.error("File fetching failed, fileData {}", fileData.fileData); + deleteFile(localFilePath, fileData.context); publishedFilesCache.remove(localFilePath); currentNumberOfTasks.decrementAndGet(); return Mono.empty(); } - private Mono<FilePublishInformation> publishToDataRouter(FilePublishInformation model, - Map<String, String> contextMap) { - MDC.setContextMap(contextMap); + private Mono<FilePublishInformation> publishToDataRouter(FilePublishInformation publishInfo) { + MDC.setContextMap(publishInfo.getContext()); return createDataRouterPublisher() - .publishFile(model, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT, contextMap) - .onErrorResume(exception -> handlePublishFailure(model, contextMap)); + .publishFile(publishInfo, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT) + .onErrorResume(exception -> handlePublishFailure(publishInfo)); } - private Mono<FilePublishInformation> handlePublishFailure(FilePublishInformation model, - Map<String, String> contextMap) { - MDC.setContextMap(contextMap); - logger.error("File publishing failed: {}", model); - Path internalFileName = model.getInternalLocation(); - deleteFile(internalFileName, contextMap); + private Mono<FilePublishInformation> handlePublishFailure(FilePublishInformation publishInfo) { + MDC.setContextMap(publishInfo.getContext()); + logger.error("File publishing failed: {}", publishInfo); + Path internalFileName = publishInfo.getInternalLocation(); + deleteFile(internalFileName, publishInfo.getContext()); publishedFilesCache.remove(internalFileName); currentNumberOfTasks.decrementAndGet(); return Mono.empty(); @@ -221,21 +243,21 @@ public class ScheduledTasks { "Consuming new file ready messages, current number of tasks: {}, published files: {}, number of subscrptions: {}", getCurrentNumberOfTasks(), publishedFilesCache.size(), this.currentNumberOfSubscriptions.get()); - Map<String, String> contextMap = MDC.getCopyOfContextMap(); + Map<String, String> context = MDC.getCopyOfContextMap(); return createConsumerTask() // .getMessageRouterResponse() // - .onErrorResume(exception -> handleConsumeMessageFailure(exception, contextMap)); + .onErrorResume(exception -> handleConsumeMessageFailure(exception, context)); } - private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> contextMap) { - MDC.setContextMap(contextMap); + private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> context) { + MDC.setContextMap(context); logger.error("Polling for file ready message failed, exception: {}, config: {}", exception.toString(), this.applicationConfiguration.getDmaapConsumerConfiguration()); return Flux.empty(); } - private void deleteFile(Path localFile, Map<String, String> contextMap) { - MDC.setContextMap(contextMap); + private void deleteFile(Path localFile, Map<String, String> context) { + MDC.setContextMap(context); logger.trace("Deleting file: {}", localFile); try { Files.delete(localFile); |