aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java3
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java6
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java14
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java52
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java11
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java108
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java3
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java3
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java4
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java27
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java3
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java37
12 files changed, 169 insertions, 102 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
index 27df49f2..c1b4c0dc 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
@@ -24,6 +24,7 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonPrimitive;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
+
import java.lang.reflect.Type;
import java.nio.file.Path;
@@ -43,7 +44,7 @@ public class CommonFunctions {
*
* @param filePublishInformation info to serialize.
*
- * @return a string with the serialized model.
+ * @return a string with the serialized info.
*/
public static String createJsonBody(FilePublishInformation filePublishInformation) {
return gson.toJson(filePublishInformation);
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java
index 45302423..066402b2 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java
@@ -17,7 +17,10 @@
package org.onap.dcaegen2.collectors.datafile.model;
import com.google.gson.annotations.SerializedName;
+
import java.nio.file.Path;
+import java.util.Map;
+
import org.immutables.gson.Gson;
import org.immutables.value.Value;
import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel;
@@ -68,4 +71,7 @@ public interface FilePublishInformation extends DmaapModel {
@SerializedName("fileFormatVersion")
String getFileFormatVersion();
+
+ @SerializedName("context")
+ Map<String, String> getContext();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java
index 2643eea5..86ea5c3e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java
@@ -18,6 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.model.logging;
import java.util.Map;
import java.util.UUID;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.methods.HttpRequestBase;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables;
@@ -84,4 +85,17 @@ public final class MappedDiagnosticContext {
MDC.put(MdcVariables.REQUEST_ID, UUID.randomUUID().toString());
return MDC.getCopyOfContextMap();
}
+
+ /**
+ * Updates the request ID in the current context.
+ * @param newRequestId the new value of the request ID
+ * @return a copy the updated context
+ */
+ public static Map<String, String> setRequestId(String newRequestId) {
+ Map<String, String> context = MDC.getCopyOfContextMap();
+ context.put(MdcVariables.REQUEST_ID, newRequestId);
+ MDC.setContextMap(context);
+ return context;
+ }
+
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index ad03170d..05f04b30 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -22,12 +22,13 @@ package org.onap.dcaegen2.collectors.datafile.tasks;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
+
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
-import java.util.Map;
+
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPut;
@@ -45,6 +46,7 @@ import org.slf4j.MDC;
import org.springframework.core.io.FileSystemResource;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
+
import reactor.core.publisher.Mono;
/**
@@ -72,56 +74,55 @@ public class DataRouterPublisher {
/**
* Publish one file.
*
- * @param model information about the file to publish
+ * @param publishInfo information about the file to publish
* @param numRetries the maximal number of retries if the publishing fails
* @param firstBackoff the time to delay the first retry
- * @param contextMap tracing context variables
* @return the (same) filePublishInformation
*/
- public Mono<FilePublishInformation> publishFile(FilePublishInformation model, long numRetries,
- Duration firstBackoff, Map<String, String> contextMap) {
- MDC.setContextMap(contextMap);
- logger.trace("publishFile called with arg {}", model);
+ public Mono<FilePublishInformation> publishFile(FilePublishInformation publishInfo, long numRetries,
+ Duration firstBackoff) {
+ MDC.setContextMap(publishInfo.getContext());
+ logger.trace("publishFile called with arg {}", publishInfo);
dmaapProducerReactiveHttpClient = resolveClient();
- return Mono.just(model) //
+ return Mono.just(publishInfo) //
.cache() //
- .flatMap(m -> publishFile(m, contextMap)) //
- .flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap)) //
+ .flatMap(this::publishFile) //
+ .flatMap(httpStatus -> handleHttpResponse(httpStatus, publishInfo)) //
.retryBackoff(numRetries, firstBackoff);
}
- private Mono<HttpStatus> publishFile(FilePublishInformation filePublishInformation,
- Map<String, String> contextMap) {
- logger.trace("Entering publishFile with {}", filePublishInformation);
+ private Mono<HttpStatus> publishFile(FilePublishInformation publishInfo
+ ) {
+ logger.trace("Entering publishFile with {}", publishInfo);
try {
HttpPut put = new HttpPut();
- prepareHead(filePublishInformation, put);
- prepareBody(filePublishInformation, put);
+ prepareHead(publishInfo, put);
+ prepareBody(publishInfo, put);
dmaapProducerReactiveHttpClient.addUserCredentialsToHead(put);
HttpResponse response =
- dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, contextMap);
+ dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext());
logger.trace("{}", response);
return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
} catch (Exception e) {
- logger.warn("Unable to send file to DataRouter. Data: {}", filePublishInformation.getInternalLocation(), e);
+ logger.warn("Unable to send file to DataRouter. Data: {}", publishInfo.getInternalLocation(), e);
return Mono.error(e);
}
}
- private void prepareHead(FilePublishInformation model, HttpPut put) {
+ private void prepareHead(FilePublishInformation publishInfo, HttpPut put) {
put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE);
- JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
+ JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(publishInfo));
metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
put.addHeader(X_DMAAP_DR_META, metaData.toString());
- put.setURI(getPublishUri(model.getName()));
+ put.setURI(getPublishUri(publishInfo.getName()));
MappedDiagnosticContext.appendTraceInfo(put);
}
- private void prepareBody(FilePublishInformation model, HttpPut put) throws IOException {
- Path fileLocation = model.getInternalLocation();
+ private void prepareBody(FilePublishInformation publishInfo, HttpPut put) throws IOException {
+ Path fileLocation = publishInfo.getInternalLocation();
try (InputStream fileInputStream = createInputStream(fileLocation)) {
put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
}
@@ -134,12 +135,11 @@ public class DataRouterPublisher {
.pathSegment(fileName).build();
}
- private Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation model,
- Map<String, String> contextMap) {
- MDC.setContextMap(contextMap);
+ private Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation publishInfo) {
+ MDC.setContextMap(publishInfo.getContext());
if (HttpUtils.isSuccessfulResponseCode(response.value())) {
logger.trace("Publish to DR successful!");
- return Mono.just(model);
+ return Mono.just(publishInfo);
} else {
logger.warn("Publish to DR unsuccessful, response code: {}", response);
return Mono.error(new Exception("Publish to DR unsuccessful, response code: " + response));
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index cb93df1e..6f3f6b72 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -20,6 +20,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Map;
+
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
@@ -33,6 +34,7 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
+
import reactor.core.publisher.Mono;
/**
@@ -75,8 +77,8 @@ public class FileCollector {
.retryBackoff(numRetries, firstBackoff);
}
- private Mono<FilePublishInformation> collectFile(FileData fileData, Map<String, String> contextMap) {
- MDC.setContextMap(contextMap);
+ private Mono<FilePublishInformation> collectFile(FileData fileData, Map<String, String> context) {
+ MDC.setContextMap(context);
logger.trace("starting to collectFile {}", fileData.name());
final String remoteFile = fileData.remoteFilePath();
@@ -86,7 +88,7 @@ public class FileCollector {
currentClient.open();
localFile.getParent().toFile().mkdir(); // Create parent directories
currentClient.collectFile(remoteFile, localFile);
- return Mono.just(getFilePublishInformation(fileData, localFile));
+ return Mono.just(getFilePublishInformation(fileData, localFile, context));
} catch (Exception throwable) {
logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
throwable.toString());
@@ -105,7 +107,7 @@ public class FileCollector {
}
}
- private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile) {
+ private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile,Map<String, String> context) {
String location = fileData.location();
MessageMetaData metaData = fileData.messageMetaData();
return ImmutableFilePublishInformation.builder() //
@@ -121,6 +123,7 @@ public class FileCollector {
.compression(fileData.compression()) //
.fileFormatType(fileData.fileFormatType()) //
.fileFormatVersion(fileData.fileFormatVersion()) //
+ .context(context) //
.build();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 364fa040..5cc894c3 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -40,6 +40,7 @@ import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
+
/**
* This implements the main flow of the data file collector. Fetch file ready events from the
* message router, fetch new files from the PNF publish these in the data router.
@@ -89,7 +90,7 @@ public class ScheduledTasks {
logger.trace("Execution of tasks was registered");
applicationConfiguration.loadConfigurationFromFile();
createMainTask(context) //
- .subscribe(model -> onSuccess(model, context), //
+ .subscribe(this::onSuccess, //
throwable -> {
onError(throwable, context);
currentNumberOfSubscriptions.decrementAndGet();
@@ -103,18 +104,30 @@ public class ScheduledTasks {
}
}
- Flux<FilePublishInformation> createMainTask(Map<String, String> contextMap) {
+ Flux<FilePublishInformation> createMainTask(Map<String, String> context) {
return fetchMoreFileReadyMessages() //
- .parallel(NUMBER_OF_WORKER_THREADS / 2) // Each message in parallel
+ .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread
.runOn(scheduler) //
.flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) //
.doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
- .filter(fileData -> shouldBePublished(fileData, contextMap)) //
- .flatMap(fileData -> fetchFile(fileData, contextMap), false, 1, 1) //
- .flatMap(model -> publishToDataRouter(model, contextMap), false, 1, 1) //
- .doOnNext(model -> deleteFile(model.getInternalLocation(), contextMap)) //
- .doOnNext(model -> currentNumberOfTasks.decrementAndGet()) //
+ .flatMap(fileData -> createMdcContext(fileData, context)) //
+ .filter(this::shouldBePublished) //
+ .flatMap(this::fetchFile, false, 1, 1) //
+ .flatMap(this::publishToDataRouter,false, 1, 1) //
+ .doOnNext(publishInfo -> deleteFile(publishInfo.getInternalLocation(), publishInfo.getContext())) //
+ .doOnNext(publishInfo -> currentNumberOfTasks.decrementAndGet()) //
.sequential();
+
+ }
+
+ private class FileDataWithContext {
+ FileDataWithContext(FileData fileData, Map<String, String> context) {
+ this.fileData = fileData;
+ this.context = context;
+ }
+
+ final FileData fileData;
+ final Map<String, String> context;
}
/**
@@ -136,6 +149,10 @@ public class ScheduledTasks {
return publishedFilesCache.size();
}
+ public int getCurrentNumberOfSubscriptions() {
+ return currentNumberOfSubscriptions.get();
+ }
+
protected DMaaPMessageConsumer createConsumerTask() {
return new DMaaPMessageConsumer(this.applicationConfiguration);
}
@@ -153,21 +170,28 @@ public class ScheduledTasks {
logger.trace("Datafile tasks have been completed");
}
- private synchronized void onSuccess(FilePublishInformation model, Map<String, String> contextMap) {
- MDC.setContextMap(contextMap);
- logger.info("Datafile file published {}", model.getInternalLocation());
+ private synchronized void onSuccess(FilePublishInformation publishInfo) {
+ MDC.setContextMap(publishInfo.getContext());
+ logger.info("Datafile file published {}", publishInfo.getInternalLocation());
}
- private void onError(Throwable throwable, Map<String, String> contextMap) {
- MDC.setContextMap(contextMap);
+ private void onError(Throwable throwable, Map<String, String> context) {
+ MDC.setContextMap(context);
logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable.toString());
}
- private boolean shouldBePublished(FileData fileData, Map<String, String> contextMap) {
+ private Mono<FileDataWithContext> createMdcContext(FileData fileData, Map<String, String> context) {
+ MDC.setContextMap(context);
+ context = MappedDiagnosticContext.setRequestId(fileData.name());
+ FileDataWithContext pair = new FileDataWithContext(fileData, context);
+ return Mono.just(pair);
+ }
+
+ private boolean shouldBePublished(FileDataWithContext fileData) {
boolean result = false;
- Path localFilePath = fileData.getLocalFilePath();
+ Path localFilePath = fileData.fileData.getLocalFilePath();
if (publishedFilesCache.put(localFilePath) == null) {
- result = !createPublishedChecker().isFilePublished(fileData.name(), contextMap);
+ result = !createPublishedChecker().isFilePublished(fileData.fileData.name(), fileData.context);
}
if (!result) {
currentNumberOfTasks.decrementAndGet();
@@ -176,38 +200,36 @@ public class ScheduledTasks {
return result;
}
- private Mono<FilePublishInformation> fetchFile(FileData fileData, Map<String, String> contextMap) {
- MDC.setContextMap(contextMap);
- return createFileCollector()
- .collectFile(fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, contextMap)
- .onErrorResume(exception -> handleFetchFileFailure(fileData, contextMap));
+ private Mono<FilePublishInformation> fetchFile(FileDataWithContext fileData) {
+ MDC.setContextMap(fileData.context);
+ return createFileCollector().collectFile(fileData.fileData, FILE_TRANSFER_MAX_RETRIES,
+ FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, fileData.context)
+ .onErrorResume(exception -> handleFetchFileFailure(fileData));
}
- private Mono<FilePublishInformation> handleFetchFileFailure(FileData fileData, Map<String, String> contextMap) {
- MDC.setContextMap(contextMap);
- Path localFilePath = fileData.getLocalFilePath();
- logger.error("File fetching failed, fileData {}", fileData);
- deleteFile(localFilePath, contextMap);
+ private Mono<FilePublishInformation> handleFetchFileFailure(FileDataWithContext fileData) {
+ MDC.setContextMap(fileData.context);
+ Path localFilePath = fileData.fileData.getLocalFilePath();
+ logger.error("File fetching failed, fileData {}", fileData.fileData);
+ deleteFile(localFilePath, fileData.context);
publishedFilesCache.remove(localFilePath);
currentNumberOfTasks.decrementAndGet();
return Mono.empty();
}
- private Mono<FilePublishInformation> publishToDataRouter(FilePublishInformation model,
- Map<String, String> contextMap) {
- MDC.setContextMap(contextMap);
+ private Mono<FilePublishInformation> publishToDataRouter(FilePublishInformation publishInfo) {
+ MDC.setContextMap(publishInfo.getContext());
return createDataRouterPublisher()
- .publishFile(model, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT, contextMap)
- .onErrorResume(exception -> handlePublishFailure(model, contextMap));
+ .publishFile(publishInfo, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT)
+ .onErrorResume(exception -> handlePublishFailure(publishInfo));
}
- private Mono<FilePublishInformation> handlePublishFailure(FilePublishInformation model,
- Map<String, String> contextMap) {
- MDC.setContextMap(contextMap);
- logger.error("File publishing failed: {}", model);
- Path internalFileName = model.getInternalLocation();
- deleteFile(internalFileName, contextMap);
+ private Mono<FilePublishInformation> handlePublishFailure(FilePublishInformation publishInfo) {
+ MDC.setContextMap(publishInfo.getContext());
+ logger.error("File publishing failed: {}", publishInfo);
+ Path internalFileName = publishInfo.getInternalLocation();
+ deleteFile(internalFileName, publishInfo.getContext());
publishedFilesCache.remove(internalFileName);
currentNumberOfTasks.decrementAndGet();
return Mono.empty();
@@ -221,21 +243,21 @@ public class ScheduledTasks {
"Consuming new file ready messages, current number of tasks: {}, published files: {}, number of subscrptions: {}",
getCurrentNumberOfTasks(), publishedFilesCache.size(), this.currentNumberOfSubscriptions.get());
- Map<String, String> contextMap = MDC.getCopyOfContextMap();
+ Map<String, String> context = MDC.getCopyOfContextMap();
return createConsumerTask() //
.getMessageRouterResponse() //
- .onErrorResume(exception -> handleConsumeMessageFailure(exception, contextMap));
+ .onErrorResume(exception -> handleConsumeMessageFailure(exception, context));
}
- private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> contextMap) {
- MDC.setContextMap(contextMap);
+ private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> context) {
+ MDC.setContextMap(context);
logger.error("Polling for file ready message failed, exception: {}, config: {}", exception.toString(),
this.applicationConfiguration.getDmaapConsumerConfiguration());
return Flux.empty();
}
- private void deleteFile(Path localFile, Map<String, String> contextMap) {
- MDC.setContextMap(contextMap);
+ private void deleteFile(Path localFile, Map<String, String> context) {
+ MDC.setContextMap(context);
logger.trace("Deleting file: {}", localFile);
try {
Files.delete(localFile);
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
index 5fdf30fc..90237c9d 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
@@ -23,6 +23,8 @@ package org.onap.dcaegen2.collectors.datafile.model;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.file.Paths;
+import java.util.HashMap;
+
import org.junit.jupiter.api.Test;
public class CommonFunctionsTest {
@@ -42,6 +44,7 @@ public class CommonFunctionsTest {
.compression("") //
.fileFormatType("") //
.fileFormatVersion("") //
+ .context(new HashMap<String,String>())
.build();
String actualBody = CommonFunctions.createJsonBody(filePublishInformation);
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java
index 950b9a6f..83c92ef4 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java
@@ -18,6 +18,8 @@ package org.onap.dcaegen2.collectors.datafile.model;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.HashMap;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -50,6 +52,7 @@ public class FilePublishInformationTest {
.compression(COMPRESSION) //
.fileFormatType(FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
+ .context(new HashMap<String,String>()) //
.build();
Assertions.assertNotNull(filePublishInformation);
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
index 06fa0b4e..5e737253 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
@@ -30,7 +30,9 @@ import static org.mockito.Mockito.when;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
@@ -47,6 +49,7 @@ import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@@ -170,6 +173,7 @@ public class DMaaPMessageConsumerTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
+ .context(new HashMap<String,String>()) //
.build();
listOfFilePublishInformation.add(filePublishInformation);
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
index 5746e0fd..847d9624 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
@@ -37,6 +37,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+
import org.apache.http.Header;
import org.apache.http.HttpResponse;
import org.apache.http.StatusLine;
@@ -54,6 +55,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPub
import org.springframework.http.HttpStatus;
import org.springframework.web.util.DefaultUriBuilderFactory;
import org.springframework.web.util.UriBuilder;
+
import reactor.test.StepVerifier;
/**
@@ -89,7 +91,7 @@ class DataRouterPublisherTest {
private static DmaapProducerHttpClient httpClientMock;
private static AppConfig appConfig;
private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
- private final Map<String, String> contextMap = new HashMap<>();
+ private static Map<String, String> context = new HashMap<>();
private static DataRouterPublisher publisherTaskUnderTestSpy;
@BeforeAll
@@ -111,6 +113,7 @@ class DataRouterPublisherTest {
.compression("gzip") //
.fileFormatType(FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
+ .context(context) //
.build(); //
appConfig = mock(AppConfig.class);
publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig));
@@ -119,9 +122,8 @@ class DataRouterPublisherTest {
@Test
public void whenPassedObjectFits_ReturnsCorrectStatus() throws Exception {
prepareMocksForTests(null, Integer.valueOf(HttpStatus.OK.value()));
- StepVerifier
- .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 1, Duration.ofSeconds(0),
- contextMap))
+ StepVerifier //
+ .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 1, Duration.ofSeconds(0)))
.expectNext(filePublishInformation) //
.verifyComplete();
@@ -146,7 +148,7 @@ class DataRouterPublisherTest {
Header[] metaHeaders = actualPut.getHeaders(X_DMAAP_DR_META);
Map<String, String> metaHash = getMetaDataAsMap(metaHeaders);
- assertTrue(10 == metaHash.size());
+ assertEquals(11, metaHash.size());
assertEquals(PRODUCT_NAME, metaHash.get("productName"));
assertEquals(VENDOR_NAME, metaHash.get("vendorName"));
assertEquals(LAST_EPOCH_MICROSEC, metaHash.get("lastEpochMicrosec"));
@@ -163,9 +165,8 @@ class DataRouterPublisherTest {
void whenPassedObjectFits_firstFailsWithExceptionThenSucceeds() throws Exception {
prepareMocksForTests(new DatafileTaskException("Error"), HttpStatus.OK.value());
- StepVerifier
- .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 2, Duration.ofSeconds(0),
- contextMap))
+ StepVerifier //
+ .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 2, Duration.ofSeconds(0)))
.expectNext(filePublishInformation) //
.verifyComplete();
}
@@ -175,9 +176,8 @@ class DataRouterPublisherTest {
prepareMocksForTests(null, Integer.valueOf(HttpStatus.BAD_GATEWAY.value()),
Integer.valueOf(HttpStatus.OK.value()));
- StepVerifier
- .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 1, Duration.ofSeconds(0),
- contextMap))
+ StepVerifier //
+ .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 1, Duration.ofSeconds(0)))
.expectNext(filePublishInformation) //
.verifyComplete();
@@ -192,9 +192,8 @@ class DataRouterPublisherTest {
prepareMocksForTests(null, Integer.valueOf(HttpStatus.BAD_GATEWAY.value()),
Integer.valueOf((HttpStatus.BAD_GATEWAY.value())));
- StepVerifier
- .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 1, Duration.ofSeconds(0),
- contextMap))
+ StepVerifier //
+ .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 1, Duration.ofSeconds(0)))
.expectErrorMessage("Retries exhausted: 1/1") //
.verify();
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
index 085f5734..1a9d6699 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
@@ -31,6 +31,7 @@ import java.nio.file.Paths;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
+
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -45,6 +46,7 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
+
import reactor.test.StepVerifier;
public class FileCollectorTest {
@@ -130,6 +132,7 @@ public class FileCollectorTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
+ .context(new HashMap<String,String>())
.build();
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
index e07ed02d..1a7db424 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
@@ -38,6 +38,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -52,6 +53,7 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@@ -162,7 +164,7 @@ public class ScheduledTasksTest {
.compression("") //
.fileFormatType("") //
.fileFormatVersion("") //
- .build();
+ .context(new HashMap<String, String>()).build();
}
@Test
@@ -190,9 +192,11 @@ public class ScheduledTasksTest {
Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
- doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull(), any());
+ doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
- StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
+ StepVerifier //
+ .create(testedObject.createMainTask(contextMap)) //
+ .expectSubscription() //
.expectNextCount(noOfFiles) //
.expectComplete() //
.verify(); //
@@ -200,7 +204,7 @@ public class ScheduledTasksTest {
assertEquals(0, testedObject.getCurrentNumberOfTasks());
verify(consumerMock, times(1)).getMessageRouterResponse();
verify(fileCollectorMock, times(noOfFiles)).collectFile(notNull(), anyLong(), notNull(), notNull());
- verify(dataRouterMock, times(noOfFiles)).publishFile(notNull(), anyLong(), notNull(), any());
+ verify(dataRouterMock, times(noOfFiles)).publishFile(notNull(), anyLong(), notNull());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
verifyNoMoreInteractions(consumerMock);
@@ -221,10 +225,12 @@ public class ScheduledTasksTest {
.when(fileCollectorMock) //
.collectFile(any(FileData.class), anyLong(), any(Duration.class), notNull());
- doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull(), any());
- doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull(), any());
+ doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
+ doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
- StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
+ StepVerifier //
+ .create(testedObject.createMainTask(contextMap)) //
+ .expectSubscription() //
.expectNextCount(3) //
.expectComplete() //
.verify(); //
@@ -232,7 +238,7 @@ public class ScheduledTasksTest {
assertEquals(0, testedObject.getCurrentNumberOfTasks());
verify(consumerMock, times(1)).getMessageRouterResponse();
verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
- verify(dataRouterMock, times(3)).publishFile(notNull(), anyLong(), notNull(), any());
+ verify(dataRouterMock, times(3)).publishFile(notNull(), anyLong(), notNull());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
verifyNoMoreInteractions(consumerMock);
@@ -253,9 +259,11 @@ public class ScheduledTasksTest {
// One publish will fail, the rest will succeed
doReturn(collectedFile, error, collectedFile, collectedFile) //
.when(dataRouterMock) //
- .publishFile(notNull(), anyLong(), notNull(), any());
+ .publishFile(notNull(), anyLong(), notNull());
- StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
+ StepVerifier //
+ .create(testedObject.createMainTask(contextMap)) //
+ .expectSubscription() //
.expectNextCount(3) // 3 completed files
.expectComplete() //
.verify(); //
@@ -263,7 +271,7 @@ public class ScheduledTasksTest {
assertEquals(0, testedObject.getCurrentNumberOfTasks());
verify(consumerMock, times(1)).getMessageRouterResponse();
verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
- verify(dataRouterMock, times(4)).publishFile(notNull(), anyLong(), notNull(), any());
+ verify(dataRouterMock, times(4)).publishFile(notNull(), anyLong(), notNull());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
verifyNoMoreInteractions(consumerMock);
@@ -282,9 +290,10 @@ public class ScheduledTasksTest {
Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
- doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull(), any());
+ doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
- StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
+ StepVerifier //
+ .create(testedObject.createMainTask(contextMap)).expectSubscription() //
.expectNextCount(1) // 99 is skipped
.expectComplete() //
.verify(); //
@@ -292,7 +301,7 @@ public class ScheduledTasksTest {
assertEquals(0, testedObject.getCurrentNumberOfTasks());
verify(consumerMock, times(1)).getMessageRouterResponse();
verify(fileCollectorMock, times(1)).collectFile(notNull(), anyLong(), notNull(), notNull());
- verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull(), any());
+ verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull());
verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), notNull());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);