diff options
Diffstat (limited to 'datafile-app-server')
12 files changed, 169 insertions, 102 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java index 27df49f2..c1b4c0dc 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java @@ -24,6 +24,7 @@ import com.google.gson.JsonElement; import com.google.gson.JsonPrimitive; import com.google.gson.JsonSerializationContext; import com.google.gson.JsonSerializer; + import java.lang.reflect.Type; import java.nio.file.Path; @@ -43,7 +44,7 @@ public class CommonFunctions { * * @param filePublishInformation info to serialize. * - * @return a string with the serialized model. + * @return a string with the serialized info. */ public static String createJsonBody(FilePublishInformation filePublishInformation) { return gson.toJson(filePublishInformation); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java index 45302423..066402b2 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java @@ -17,7 +17,10 @@ package org.onap.dcaegen2.collectors.datafile.model; import com.google.gson.annotations.SerializedName; + import java.nio.file.Path; +import java.util.Map; + import org.immutables.gson.Gson; import org.immutables.value.Value; import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel; @@ -68,4 +71,7 @@ public interface FilePublishInformation extends DmaapModel { @SerializedName("fileFormatVersion") String getFileFormatVersion(); + + @SerializedName("context") + Map<String, String> getContext(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java index 2643eea5..86ea5c3e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java @@ -18,6 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.model.logging; import java.util.Map; import java.util.UUID; + import org.apache.commons.lang3.StringUtils; import org.apache.http.client.methods.HttpRequestBase; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables; @@ -84,4 +85,17 @@ public final class MappedDiagnosticContext { MDC.put(MdcVariables.REQUEST_ID, UUID.randomUUID().toString()); return MDC.getCopyOfContextMap(); } + + /** + * Updates the request ID in the current context. + * @param newRequestId the new value of the request ID + * @return a copy the updated context + */ + public static Map<String, String> setRequestId(String newRequestId) { + Map<String, String> context = MDC.getCopyOfContextMap(); + context.put(MdcVariables.REQUEST_ID, newRequestId); + MDC.setContextMap(context); + return context; + } + } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java index ad03170d..05f04b30 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -22,12 +22,13 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import com.google.gson.JsonElement; import com.google.gson.JsonParser; + import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.nio.file.Path; import java.time.Duration; -import java.util.Map; + import org.apache.commons.io.IOUtils; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpPut; @@ -45,6 +46,7 @@ import org.slf4j.MDC; import org.springframework.core.io.FileSystemResource; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; + import reactor.core.publisher.Mono; /** @@ -72,56 +74,55 @@ public class DataRouterPublisher { /** * Publish one file. * - * @param model information about the file to publish + * @param publishInfo information about the file to publish * @param numRetries the maximal number of retries if the publishing fails * @param firstBackoff the time to delay the first retry - * @param contextMap tracing context variables * @return the (same) filePublishInformation */ - public Mono<FilePublishInformation> publishFile(FilePublishInformation model, long numRetries, - Duration firstBackoff, Map<String, String> contextMap) { - MDC.setContextMap(contextMap); - logger.trace("publishFile called with arg {}", model); + public Mono<FilePublishInformation> publishFile(FilePublishInformation publishInfo, long numRetries, + Duration firstBackoff) { + MDC.setContextMap(publishInfo.getContext()); + logger.trace("publishFile called with arg {}", publishInfo); dmaapProducerReactiveHttpClient = resolveClient(); - return Mono.just(model) // + return Mono.just(publishInfo) // .cache() // - .flatMap(m -> publishFile(m, contextMap)) // - .flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap)) // + .flatMap(this::publishFile) // + .flatMap(httpStatus -> handleHttpResponse(httpStatus, publishInfo)) // .retryBackoff(numRetries, firstBackoff); } - private Mono<HttpStatus> publishFile(FilePublishInformation filePublishInformation, - Map<String, String> contextMap) { - logger.trace("Entering publishFile with {}", filePublishInformation); + private Mono<HttpStatus> publishFile(FilePublishInformation publishInfo + ) { + logger.trace("Entering publishFile with {}", publishInfo); try { HttpPut put = new HttpPut(); - prepareHead(filePublishInformation, put); - prepareBody(filePublishInformation, put); + prepareHead(publishInfo, put); + prepareBody(publishInfo, put); dmaapProducerReactiveHttpClient.addUserCredentialsToHead(put); HttpResponse response = - dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, contextMap); + dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext()); logger.trace("{}", response); return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode())); } catch (Exception e) { - logger.warn("Unable to send file to DataRouter. Data: {}", filePublishInformation.getInternalLocation(), e); + logger.warn("Unable to send file to DataRouter. Data: {}", publishInfo.getInternalLocation(), e); return Mono.error(e); } } - private void prepareHead(FilePublishInformation model, HttpPut put) { + private void prepareHead(FilePublishInformation publishInfo, HttpPut put) { put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE); - JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model)); + JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(publishInfo)); metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString(); metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG); put.addHeader(X_DMAAP_DR_META, metaData.toString()); - put.setURI(getPublishUri(model.getName())); + put.setURI(getPublishUri(publishInfo.getName())); MappedDiagnosticContext.appendTraceInfo(put); } - private void prepareBody(FilePublishInformation model, HttpPut put) throws IOException { - Path fileLocation = model.getInternalLocation(); + private void prepareBody(FilePublishInformation publishInfo, HttpPut put) throws IOException { + Path fileLocation = publishInfo.getInternalLocation(); try (InputStream fileInputStream = createInputStream(fileLocation)) { put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream))); } @@ -134,12 +135,11 @@ public class DataRouterPublisher { .pathSegment(fileName).build(); } - private Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation model, - Map<String, String> contextMap) { - MDC.setContextMap(contextMap); + private Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation publishInfo) { + MDC.setContextMap(publishInfo.getContext()); if (HttpUtils.isSuccessfulResponseCode(response.value())) { logger.trace("Publish to DR successful!"); - return Mono.just(model); + return Mono.just(publishInfo); } else { logger.warn("Publish to DR unsuccessful, response code: {}", response); return Mono.error(new Exception("Publish to DR unsuccessful, response code: " + response)); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java index cb93df1e..6f3f6b72 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -20,6 +20,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; import java.util.Map; + import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; @@ -33,6 +34,7 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; + import reactor.core.publisher.Mono; /** @@ -75,8 +77,8 @@ public class FileCollector { .retryBackoff(numRetries, firstBackoff); } - private Mono<FilePublishInformation> collectFile(FileData fileData, Map<String, String> contextMap) { - MDC.setContextMap(contextMap); + private Mono<FilePublishInformation> collectFile(FileData fileData, Map<String, String> context) { + MDC.setContextMap(context); logger.trace("starting to collectFile {}", fileData.name()); final String remoteFile = fileData.remoteFilePath(); @@ -86,7 +88,7 @@ public class FileCollector { currentClient.open(); localFile.getParent().toFile().mkdir(); // Create parent directories currentClient.collectFile(remoteFile, localFile); - return Mono.just(getFilePublishInformation(fileData, localFile)); + return Mono.just(getFilePublishInformation(fileData, localFile, context)); } catch (Exception throwable) { logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(), throwable.toString()); @@ -105,7 +107,7 @@ public class FileCollector { } } - private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile) { + private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile,Map<String, String> context) { String location = fileData.location(); MessageMetaData metaData = fileData.messageMetaData(); return ImmutableFilePublishInformation.builder() // @@ -121,6 +123,7 @@ public class FileCollector { .compression(fileData.compression()) // .fileFormatType(fileData.fileFormatType()) // .fileFormatVersion(fileData.fileFormatVersion()) // + .context(context) // .build(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 364fa040..5cc894c3 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -40,6 +40,7 @@ import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; + /** * This implements the main flow of the data file collector. Fetch file ready events from the * message router, fetch new files from the PNF publish these in the data router. @@ -89,7 +90,7 @@ public class ScheduledTasks { logger.trace("Execution of tasks was registered"); applicationConfiguration.loadConfigurationFromFile(); createMainTask(context) // - .subscribe(model -> onSuccess(model, context), // + .subscribe(this::onSuccess, // throwable -> { onError(throwable, context); currentNumberOfSubscriptions.decrementAndGet(); @@ -103,18 +104,30 @@ public class ScheduledTasks { } } - Flux<FilePublishInformation> createMainTask(Map<String, String> contextMap) { + Flux<FilePublishInformation> createMainTask(Map<String, String> context) { return fetchMoreFileReadyMessages() // - .parallel(NUMBER_OF_WORKER_THREADS / 2) // Each message in parallel + .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread .runOn(scheduler) // .flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) // .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) // - .filter(fileData -> shouldBePublished(fileData, contextMap)) // - .flatMap(fileData -> fetchFile(fileData, contextMap), false, 1, 1) // - .flatMap(model -> publishToDataRouter(model, contextMap), false, 1, 1) // - .doOnNext(model -> deleteFile(model.getInternalLocation(), contextMap)) // - .doOnNext(model -> currentNumberOfTasks.decrementAndGet()) // + .flatMap(fileData -> createMdcContext(fileData, context)) // + .filter(this::shouldBePublished) // + .flatMap(this::fetchFile, false, 1, 1) // + .flatMap(this::publishToDataRouter,false, 1, 1) // + .doOnNext(publishInfo -> deleteFile(publishInfo.getInternalLocation(), publishInfo.getContext())) // + .doOnNext(publishInfo -> currentNumberOfTasks.decrementAndGet()) // .sequential(); + + } + + private class FileDataWithContext { + FileDataWithContext(FileData fileData, Map<String, String> context) { + this.fileData = fileData; + this.context = context; + } + + final FileData fileData; + final Map<String, String> context; } /** @@ -136,6 +149,10 @@ public class ScheduledTasks { return publishedFilesCache.size(); } + public int getCurrentNumberOfSubscriptions() { + return currentNumberOfSubscriptions.get(); + } + protected DMaaPMessageConsumer createConsumerTask() { return new DMaaPMessageConsumer(this.applicationConfiguration); } @@ -153,21 +170,28 @@ public class ScheduledTasks { logger.trace("Datafile tasks have been completed"); } - private synchronized void onSuccess(FilePublishInformation model, Map<String, String> contextMap) { - MDC.setContextMap(contextMap); - logger.info("Datafile file published {}", model.getInternalLocation()); + private synchronized void onSuccess(FilePublishInformation publishInfo) { + MDC.setContextMap(publishInfo.getContext()); + logger.info("Datafile file published {}", publishInfo.getInternalLocation()); } - private void onError(Throwable throwable, Map<String, String> contextMap) { - MDC.setContextMap(contextMap); + private void onError(Throwable throwable, Map<String, String> context) { + MDC.setContextMap(context); logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable.toString()); } - private boolean shouldBePublished(FileData fileData, Map<String, String> contextMap) { + private Mono<FileDataWithContext> createMdcContext(FileData fileData, Map<String, String> context) { + MDC.setContextMap(context); + context = MappedDiagnosticContext.setRequestId(fileData.name()); + FileDataWithContext pair = new FileDataWithContext(fileData, context); + return Mono.just(pair); + } + + private boolean shouldBePublished(FileDataWithContext fileData) { boolean result = false; - Path localFilePath = fileData.getLocalFilePath(); + Path localFilePath = fileData.fileData.getLocalFilePath(); if (publishedFilesCache.put(localFilePath) == null) { - result = !createPublishedChecker().isFilePublished(fileData.name(), contextMap); + result = !createPublishedChecker().isFilePublished(fileData.fileData.name(), fileData.context); } if (!result) { currentNumberOfTasks.decrementAndGet(); @@ -176,38 +200,36 @@ public class ScheduledTasks { return result; } - private Mono<FilePublishInformation> fetchFile(FileData fileData, Map<String, String> contextMap) { - MDC.setContextMap(contextMap); - return createFileCollector() - .collectFile(fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, contextMap) - .onErrorResume(exception -> handleFetchFileFailure(fileData, contextMap)); + private Mono<FilePublishInformation> fetchFile(FileDataWithContext fileData) { + MDC.setContextMap(fileData.context); + return createFileCollector().collectFile(fileData.fileData, FILE_TRANSFER_MAX_RETRIES, + FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, fileData.context) + .onErrorResume(exception -> handleFetchFileFailure(fileData)); } - private Mono<FilePublishInformation> handleFetchFileFailure(FileData fileData, Map<String, String> contextMap) { - MDC.setContextMap(contextMap); - Path localFilePath = fileData.getLocalFilePath(); - logger.error("File fetching failed, fileData {}", fileData); - deleteFile(localFilePath, contextMap); + private Mono<FilePublishInformation> handleFetchFileFailure(FileDataWithContext fileData) { + MDC.setContextMap(fileData.context); + Path localFilePath = fileData.fileData.getLocalFilePath(); + logger.error("File fetching failed, fileData {}", fileData.fileData); + deleteFile(localFilePath, fileData.context); publishedFilesCache.remove(localFilePath); currentNumberOfTasks.decrementAndGet(); return Mono.empty(); } - private Mono<FilePublishInformation> publishToDataRouter(FilePublishInformation model, - Map<String, String> contextMap) { - MDC.setContextMap(contextMap); + private Mono<FilePublishInformation> publishToDataRouter(FilePublishInformation publishInfo) { + MDC.setContextMap(publishInfo.getContext()); return createDataRouterPublisher() - .publishFile(model, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT, contextMap) - .onErrorResume(exception -> handlePublishFailure(model, contextMap)); + .publishFile(publishInfo, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT) + .onErrorResume(exception -> handlePublishFailure(publishInfo)); } - private Mono<FilePublishInformation> handlePublishFailure(FilePublishInformation model, - Map<String, String> contextMap) { - MDC.setContextMap(contextMap); - logger.error("File publishing failed: {}", model); - Path internalFileName = model.getInternalLocation(); - deleteFile(internalFileName, contextMap); + private Mono<FilePublishInformation> handlePublishFailure(FilePublishInformation publishInfo) { + MDC.setContextMap(publishInfo.getContext()); + logger.error("File publishing failed: {}", publishInfo); + Path internalFileName = publishInfo.getInternalLocation(); + deleteFile(internalFileName, publishInfo.getContext()); publishedFilesCache.remove(internalFileName); currentNumberOfTasks.decrementAndGet(); return Mono.empty(); @@ -221,21 +243,21 @@ public class ScheduledTasks { "Consuming new file ready messages, current number of tasks: {}, published files: {}, number of subscrptions: {}", getCurrentNumberOfTasks(), publishedFilesCache.size(), this.currentNumberOfSubscriptions.get()); - Map<String, String> contextMap = MDC.getCopyOfContextMap(); + Map<String, String> context = MDC.getCopyOfContextMap(); return createConsumerTask() // .getMessageRouterResponse() // - .onErrorResume(exception -> handleConsumeMessageFailure(exception, contextMap)); + .onErrorResume(exception -> handleConsumeMessageFailure(exception, context)); } - private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> contextMap) { - MDC.setContextMap(contextMap); + private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> context) { + MDC.setContextMap(context); logger.error("Polling for file ready message failed, exception: {}, config: {}", exception.toString(), this.applicationConfiguration.getDmaapConsumerConfiguration()); return Flux.empty(); } - private void deleteFile(Path localFile, Map<String, String> contextMap) { - MDC.setContextMap(contextMap); + private void deleteFile(Path localFile, Map<String, String> context) { + MDC.setContextMap(context); logger.trace("Deleting file: {}", localFile); try { Files.delete(localFile); diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java index 5fdf30fc..90237c9d 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java @@ -23,6 +23,8 @@ package org.onap.dcaegen2.collectors.datafile.model; import static org.junit.jupiter.api.Assertions.assertTrue; import java.nio.file.Paths; +import java.util.HashMap; + import org.junit.jupiter.api.Test; public class CommonFunctionsTest { @@ -42,6 +44,7 @@ public class CommonFunctionsTest { .compression("") // .fileFormatType("") // .fileFormatVersion("") // + .context(new HashMap<String,String>()) .build(); String actualBody = CommonFunctions.createJsonBody(filePublishInformation); diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java index 950b9a6f..83c92ef4 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java @@ -18,6 +18,8 @@ package org.onap.dcaegen2.collectors.datafile.model; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.HashMap; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -50,6 +52,7 @@ public class FilePublishInformationTest { .compression(COMPRESSION) // .fileFormatType(FILE_FORMAT_TYPE) // .fileFormatVersion(FILE_FORMAT_VERSION) // + .context(new HashMap<String,String>()) // .build(); Assertions.assertNotNull(filePublishInformation); diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java index 06fa0b4e..5e737253 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java @@ -30,7 +30,9 @@ import static org.mockito.Mockito.when; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; + import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; @@ -47,6 +49,7 @@ import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -170,6 +173,7 @@ public class DMaaPMessageConsumerTest { .compression(GZIP_COMPRESSION) // .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) // .fileFormatVersion(FILE_FORMAT_VERSION) // + .context(new HashMap<String,String>()) // .build(); listOfFilePublishInformation.add(filePublishInformation); diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java index 5746e0fd..847d9624 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java @@ -37,6 +37,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; + import org.apache.http.Header; import org.apache.http.HttpResponse; import org.apache.http.StatusLine; @@ -54,6 +55,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPub import org.springframework.http.HttpStatus; import org.springframework.web.util.DefaultUriBuilderFactory; import org.springframework.web.util.UriBuilder; + import reactor.test.StepVerifier; /** @@ -89,7 +91,7 @@ class DataRouterPublisherTest { private static DmaapProducerHttpClient httpClientMock; private static AppConfig appConfig; private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class); - private final Map<String, String> contextMap = new HashMap<>(); + private static Map<String, String> context = new HashMap<>(); private static DataRouterPublisher publisherTaskUnderTestSpy; @BeforeAll @@ -111,6 +113,7 @@ class DataRouterPublisherTest { .compression("gzip") // .fileFormatType(FILE_FORMAT_TYPE) // .fileFormatVersion(FILE_FORMAT_VERSION) // + .context(context) // .build(); // appConfig = mock(AppConfig.class); publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig)); @@ -119,9 +122,8 @@ class DataRouterPublisherTest { @Test public void whenPassedObjectFits_ReturnsCorrectStatus() throws Exception { prepareMocksForTests(null, Integer.valueOf(HttpStatus.OK.value())); - StepVerifier - .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 1, Duration.ofSeconds(0), - contextMap)) + StepVerifier // + .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 1, Duration.ofSeconds(0))) .expectNext(filePublishInformation) // .verifyComplete(); @@ -146,7 +148,7 @@ class DataRouterPublisherTest { Header[] metaHeaders = actualPut.getHeaders(X_DMAAP_DR_META); Map<String, String> metaHash = getMetaDataAsMap(metaHeaders); - assertTrue(10 == metaHash.size()); + assertEquals(11, metaHash.size()); assertEquals(PRODUCT_NAME, metaHash.get("productName")); assertEquals(VENDOR_NAME, metaHash.get("vendorName")); assertEquals(LAST_EPOCH_MICROSEC, metaHash.get("lastEpochMicrosec")); @@ -163,9 +165,8 @@ class DataRouterPublisherTest { void whenPassedObjectFits_firstFailsWithExceptionThenSucceeds() throws Exception { prepareMocksForTests(new DatafileTaskException("Error"), HttpStatus.OK.value()); - StepVerifier - .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 2, Duration.ofSeconds(0), - contextMap)) + StepVerifier // + .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 2, Duration.ofSeconds(0))) .expectNext(filePublishInformation) // .verifyComplete(); } @@ -175,9 +176,8 @@ class DataRouterPublisherTest { prepareMocksForTests(null, Integer.valueOf(HttpStatus.BAD_GATEWAY.value()), Integer.valueOf(HttpStatus.OK.value())); - StepVerifier - .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 1, Duration.ofSeconds(0), - contextMap)) + StepVerifier // + .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 1, Duration.ofSeconds(0))) .expectNext(filePublishInformation) // .verifyComplete(); @@ -192,9 +192,8 @@ class DataRouterPublisherTest { prepareMocksForTests(null, Integer.valueOf(HttpStatus.BAD_GATEWAY.value()), Integer.valueOf((HttpStatus.BAD_GATEWAY.value()))); - StepVerifier - .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 1, Duration.ofSeconds(0), - contextMap)) + StepVerifier // + .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 1, Duration.ofSeconds(0))) .expectErrorMessage("Retries exhausted: 1/1") // .verify(); diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java index 085f5734..1a9d6699 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java @@ -31,6 +31,7 @@ import java.nio.file.Paths; import java.time.Duration; import java.util.HashMap; import java.util.Map; + import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; @@ -45,6 +46,7 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData; import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; + import reactor.test.StepVerifier; public class FileCollectorTest { @@ -130,6 +132,7 @@ public class FileCollectorTest { .compression(GZIP_COMPRESSION) // .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) // .fileFormatVersion(FILE_FORMAT_VERSION) // + .context(new HashMap<String,String>()) .build(); } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java index e07ed02d..1a7db424 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java @@ -38,6 +38,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; @@ -52,6 +53,7 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData; import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -162,7 +164,7 @@ public class ScheduledTasksTest { .compression("") // .fileFormatType("") // .fileFormatVersion("") // - .build(); + .context(new HashMap<String, String>()).build(); } @Test @@ -190,9 +192,11 @@ public class ScheduledTasksTest { Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation()); doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull()); - doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull(), any()); + doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull()); - StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() // + StepVerifier // + .create(testedObject.createMainTask(contextMap)) // + .expectSubscription() // .expectNextCount(noOfFiles) // .expectComplete() // .verify(); // @@ -200,7 +204,7 @@ public class ScheduledTasksTest { assertEquals(0, testedObject.getCurrentNumberOfTasks()); verify(consumerMock, times(1)).getMessageRouterResponse(); verify(fileCollectorMock, times(noOfFiles)).collectFile(notNull(), anyLong(), notNull(), notNull()); - verify(dataRouterMock, times(noOfFiles)).publishFile(notNull(), anyLong(), notNull(), any()); + verify(dataRouterMock, times(noOfFiles)).publishFile(notNull(), anyLong(), notNull()); verifyNoMoreInteractions(dataRouterMock); verifyNoMoreInteractions(fileCollectorMock); verifyNoMoreInteractions(consumerMock); @@ -221,10 +225,12 @@ public class ScheduledTasksTest { .when(fileCollectorMock) // .collectFile(any(FileData.class), anyLong(), any(Duration.class), notNull()); - doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull(), any()); - doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull(), any()); + doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull()); + doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull()); - StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() // + StepVerifier // + .create(testedObject.createMainTask(contextMap)) // + .expectSubscription() // .expectNextCount(3) // .expectComplete() // .verify(); // @@ -232,7 +238,7 @@ public class ScheduledTasksTest { assertEquals(0, testedObject.getCurrentNumberOfTasks()); verify(consumerMock, times(1)).getMessageRouterResponse(); verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull()); - verify(dataRouterMock, times(3)).publishFile(notNull(), anyLong(), notNull(), any()); + verify(dataRouterMock, times(3)).publishFile(notNull(), anyLong(), notNull()); verifyNoMoreInteractions(dataRouterMock); verifyNoMoreInteractions(fileCollectorMock); verifyNoMoreInteractions(consumerMock); @@ -253,9 +259,11 @@ public class ScheduledTasksTest { // One publish will fail, the rest will succeed doReturn(collectedFile, error, collectedFile, collectedFile) // .when(dataRouterMock) // - .publishFile(notNull(), anyLong(), notNull(), any()); + .publishFile(notNull(), anyLong(), notNull()); - StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() // + StepVerifier // + .create(testedObject.createMainTask(contextMap)) // + .expectSubscription() // .expectNextCount(3) // 3 completed files .expectComplete() // .verify(); // @@ -263,7 +271,7 @@ public class ScheduledTasksTest { assertEquals(0, testedObject.getCurrentNumberOfTasks()); verify(consumerMock, times(1)).getMessageRouterResponse(); verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull()); - verify(dataRouterMock, times(4)).publishFile(notNull(), anyLong(), notNull(), any()); + verify(dataRouterMock, times(4)).publishFile(notNull(), anyLong(), notNull()); verifyNoMoreInteractions(dataRouterMock); verifyNoMoreInteractions(fileCollectorMock); verifyNoMoreInteractions(consumerMock); @@ -282,9 +290,10 @@ public class ScheduledTasksTest { Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation()); doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull()); - doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull(), any()); + doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull()); - StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() // + StepVerifier // + .create(testedObject.createMainTask(contextMap)).expectSubscription() // .expectNextCount(1) // 99 is skipped .expectComplete() // .verify(); // @@ -292,7 +301,7 @@ public class ScheduledTasksTest { assertEquals(0, testedObject.getCurrentNumberOfTasks()); verify(consumerMock, times(1)).getMessageRouterResponse(); verify(fileCollectorMock, times(1)).collectFile(notNull(), anyLong(), notNull(), notNull()); - verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull(), any()); + verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull()); verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), notNull()); verifyNoMoreInteractions(dataRouterMock); verifyNoMoreInteractions(fileCollectorMock); |