summaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java6
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java40
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java58
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java19
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java126
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java40
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java119
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java38
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java (renamed from datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java)0
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java87
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java519
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java208
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java226
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java22
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java175
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java20
16 files changed, 1124 insertions, 579 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index 82c390f7..f89a1012 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -42,6 +42,8 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.stereotype.Component;
/**
+ * Holds all configuration for the DFC.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
@@ -79,8 +81,10 @@ public class AppConfig {
return ftpesConfiguration;
}
+ /**
+ * Reads the configuration from file.
+ */
public void loadConfigurationFromFile() {
-
GsonBuilder gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
JsonParser parser = new JsonParser();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
index b4dc6353..52723330 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -18,6 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.INVOCATION_ID;
import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
+
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
@@ -25,7 +26,9 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
+
import javax.annotation.PostConstruct;
+
import org.apache.commons.lang3.StringUtils;
import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
@@ -40,11 +43,15 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
+
import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
/**
+ * Api for starting and stopping DFC.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/13/18
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
@Configuration
@EnableScheduling
@@ -63,11 +70,18 @@ public class SchedulerConfig {
private final ScheduledTasks scheduledTask;
private final CloudConfiguration cloudConfiguration;
+ /**
+ * Constructor.
+ *
+ * @param taskScheduler The scheduler used to schedule the tasks.
+ * @param scheduledTasks The scheduler that will actually handle the tasks.
+ * @param cloudConfiguration The DFC configuration.
+ */
@Autowired
- public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTask,
- CloudConfiguration cloudConfiguration) {
+ public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTasks,
+ CloudConfiguration cloudConfiguration) {
this.taskScheduler = taskScheduler;
- this.scheduledTask = scheduledTask;
+ this.scheduledTask = scheduledTasks;
this.cloudConfiguration = cloudConfiguration;
}
@@ -84,7 +98,7 @@ public class SchedulerConfig {
logger.info(EXIT, "Stopped Datafile workflow");
MDC.clear();
return Mono.defer(() -> Mono
- .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED)));
+ .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED)));
}
/**
@@ -106,12 +120,14 @@ public class SchedulerConfig {
contextMap = MDC.getCopyOfContextMap();
logger.info(ENTRY, "Start scheduling Datafile workflow");
if (scheduledFutureList.isEmpty()) {
- scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(() -> cloudConfiguration.runTask(contextMap), Instant.now(),
- SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY));
- scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.scheduleMainDatafileEventTask(contextMap),
- SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
- scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()),
- SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE));
+ scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(() -> cloudConfiguration.runTask(contextMap),
+ Instant.now(), SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY));
+ scheduledFutureList.add(
+ taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.scheduleMainDatafileEventTask(contextMap),
+ SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
+ scheduledFutureList
+ .add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()),
+ SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE));
return true;
} else {
@@ -119,4 +135,4 @@ public class SchedulerConfig {
}
}
-} \ No newline at end of file
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index a8f79ea1..af45cc99 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -88,11 +88,17 @@ public class JsonMessageParser {
}
}
+ /**
+ * Parses the Json message and returns a stream of messages.
+ *
+ * @param rawMessage the Json message to parse.
+ * @return a <code>Flux</code> containing messages.
+ */
public Flux<FileReadyMessage> getMessagesFromJson(Mono<String> rawMessage) {
return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createMessageData);
}
- public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
+ Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
JsonParser jsonParser = new JsonParser();
return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
: element.isJsonObject() ? Optional.of((JsonObject) element)
@@ -136,13 +142,11 @@ public class JsonMessageParser {
List<FileData> allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap);
if (!allFileDataFromJson.isEmpty()) {
MessageMetaData messageMetaData = optionalMessageMetaData.get();
- // @formatter:off
- return Mono.just(ImmutableFileReadyMessage.builder()
- .pnfName(messageMetaData.sourceName())
- .messageMetaData(messageMetaData)
- .files(allFileDataFromJson)
+ return Mono.just(ImmutableFileReadyMessage.builder() //
+ .pnfName(messageMetaData.sourceName()) //
+ .messageMetaData(messageMetaData) //
+ .files(allFileDataFromJson) //
.build());
- // @formatter:on
} else {
return Mono.empty();
}
@@ -168,18 +172,16 @@ public class JsonMessageParser {
// version.
getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION, missingValues);
- // @formatter:off
- MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
- .productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues))
- .vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues))
- .lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues))
- .sourceName(getValueFromJson(commonEventHeader, SOURCE_NAME, missingValues))
- .startEpochMicrosec(getValueFromJson(commonEventHeader, START_EPOCH_MICROSEC, missingValues))
- .timeZoneOffset(getValueFromJson(commonEventHeader, TIME_ZONE_OFFSET, missingValues))
- .changeIdentifier(changeIdentifier)
- .changeType(changeType)
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
+ .productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues)) //
+ .vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues)) //
+ .lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues)) //
+ .sourceName(getValueFromJson(commonEventHeader, SOURCE_NAME, missingValues)) //
+ .startEpochMicrosec(getValueFromJson(commonEventHeader, START_EPOCH_MICROSEC, missingValues)) //
+ .timeZoneOffset(getValueFromJson(commonEventHeader, TIME_ZONE_OFFSET, missingValues)) //
+ .changeIdentifier(changeIdentifier) //
+ .changeType(changeType) //
.build();
- // @formatter:on
if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) {
return Optional.of(messageMetaData);
} else {
@@ -231,16 +233,14 @@ public class JsonMessageParser {
logger.error("Unable to collect file from xNF.", e);
return Optional.empty();
}
- // @formatter:off
- FileData fileData = ImmutableFileData.builder()
- .name(getValueFromJson(fileInfo, NAME, missingValues))
- .fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues))
- .fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues))
- .location(location)
- .scheme(scheme)
- .compression(getValueFromJson(data, COMPRESSION, missingValues))
+ FileData fileData = ImmutableFileData.builder() //
+ .name(getValueFromJson(fileInfo, NAME, missingValues)) //
+ .fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues)) //
+ .fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues)) //
+ .location(location) //
+ .scheme(scheme) //
+ .compression(getValueFromJson(data, COMPRESSION, missingValues)) //
.build();
- // @formatter:on
if (missingValues.isEmpty()) {
return Optional.of(fileData);
}
@@ -250,8 +250,8 @@ public class JsonMessageParser {
}
/**
- * Gets data from the event name, defined as:
- * {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example:
+ * Gets data from the event name.
+ * Defined as: {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example:
* Noti_RnNode-Ericsson_FileReady
*
* @param dataType The type of data to get, {@link DmaapConsumerJsonParser.EventNameDataType}.
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
index 2cb84112..e2dca182 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
@@ -13,6 +13,7 @@
* the License.
* ============LICENSE_END========================================================================
*/
+
package org.onap.dcaegen2.collectors.datafile.service;
import java.nio.file.Path;
@@ -29,14 +30,30 @@ import java.util.Map;
public class PublishedFileCache {
private final Map<Path, Instant> publishedFiles = Collections.synchronizedMap(new HashMap<Path, Instant>());
+ /**
+ * Adds a file to the cache.
+ *
+ * @param path the name of the file to add.
+ * @return <code>null</code> if the file is not already in the cache.
+ */
public Instant put(Path path) {
return publishedFiles.put(path, Instant.now());
}
+ /**
+ * Removes a file from the cache.
+ *
+ * @param localFileName name of the file to remove.
+ */
public void remove(Path localFileName) {
publishedFiles.remove(localFileName);
}
+ /**
+ * Removes files 24 hours older than the given instant.
+ *
+ * @param now the instant will determine which files that will be purged.
+ */
public void purge(Instant now) {
for (Iterator<Map.Entry<Path, Instant>> it = publishedFiles.entrySet().iterator(); it.hasNext();) {
Map.Entry<Path, Instant> pair = it.next();
@@ -46,7 +63,7 @@ public class PublishedFileCache {
}
}
- public int size() {
+ int size() {
return publishedFiles.size();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index 57edc364..0fef9ab4 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -1,24 +1,46 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.collectors.datafile.tasks;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.file.Path;
import java.time.Duration;
import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
@@ -26,17 +48,30 @@ import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReact
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.springframework.core.io.FileSystemResource;
+import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
+
import reactor.core.publisher.Mono;
/**
+ * Publishes a file to the DataRouter.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
public class DataRouterPublisher {
+ private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
+ private static final String CONTENT_TYPE = "application/octet-stream";
+ private static final String NAME_JSON_TAG = "name";
+ private static final String INTERNAL_LOCATION_JSON_TAG = "internalLocation";
+ private static final String PUBLISH_TOPIC = "publish";
+ private static final String DEFAULT_FEED_ID = "1";
private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class);
private final AppConfig datafileAppConfig;
+ private DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient;
public DataRouterPublisher(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
@@ -44,25 +79,70 @@ public class DataRouterPublisher {
/**
- * Publish one file
- * @param consumerDmaapModel information about the file to publish
- * @param maxNumberOfRetries the maximal number of retries if the publishing fails
- * @param firstBackoffTimeout the time to delay the first retry
+ * Publish one file.
+ *
+ * @param model information about the file to publish
+ * @param numRetries the maximal number of retries if the publishing fails
+ * @param firstBackoff the time to delay the first retry
* @return the HTTP response status as a string
*/
public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff,
Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
logger.trace("Method called with arg {}", model);
- DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient();
+ dmaapProducerReactiveHttpClient = resolveClient();
- //@formatter:off
return Mono.just(model)
.cache()
- .flatMap(m -> dmaapProducerReactiveHttpClient.getDmaapProducerResponse(m, contextMap))
- .flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap))
+ .flatMap(m -> publishFile(m, contextMap)) //
+ .flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap)) //
.retryBackoff(numRetries, firstBackoff);
- //@formatter:on
+ }
+
+ private Mono<HttpStatus> publishFile(ConsumerDmaapModel consumerDmaapModel, Map<String, String> contextMap) {
+ logger.trace("Entering publishFile with {}", consumerDmaapModel);
+ try {
+ HttpPut put = new HttpPut();
+ String requestId = MDC.get(REQUEST_ID);
+ put.addHeader(X_ONAP_REQUEST_ID, requestId);
+ String invocationId = UUID.randomUUID().toString();
+ put.addHeader(X_INVOCATION_ID, invocationId);
+
+ prepareHead(consumerDmaapModel, put);
+ prepareBody(consumerDmaapModel, put);
+ dmaapProducerReactiveHttpClient.addUserCredentialsToHead(put);
+
+ HttpResponse response =
+ dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, contextMap);
+ logger.trace(response.toString());
+ return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
+ } catch (Exception e) {
+ logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e);
+ return Mono.error(e);
+ }
+ }
+
+ private void prepareHead(ConsumerDmaapModel model, HttpPut put) {
+ put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE);
+ JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
+ metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
+ metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
+ put.addHeader(X_DMAAP_DR_META, metaData.toString());
+ put.setURI(getPublishUri(model.getInternalLocation().getFileName().toString()));
+ }
+
+ private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException {
+ Path fileLocation = model.getInternalLocation();
+ try (InputStream fileInputStream = createInputStream(fileLocation)) {
+ put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
+ }
+ }
+
+ private URI getPublishUri(String fileName) {
+ return dmaapProducerReactiveHttpClient.getBaseUri() //
+ .pathSegment(PUBLISH_TOPIC) //
+ .pathSegment(DEFAULT_FEED_ID) //
+ .pathSegment(fileName).build();
}
private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model,
@@ -77,6 +157,10 @@ public class DataRouterPublisher {
}
}
+ InputStream createInputStream(Path filePath) throws IOException {
+ FileSystemResource realResource = new FileSystemResource(filePath);
+ return realResource.getInputStream();
+ }
DmaapPublisherConfiguration resolveConfiguration() {
return datafileAppConfig.getDmaapPublisherConfiguration();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index af4670e3..fb27a579 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -20,6 +20,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Map;
+
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
@@ -33,6 +34,7 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import reactor.core.publisher.Mono;
/**
@@ -52,12 +54,10 @@ public class FileCollector {
MdcVariables.setMdcContextMap(contextMap);
logger.trace("Entering execute with {}", fileData);
- //@formatter:off
- return Mono.just(fileData)
- .cache()
- .flatMap(fd -> collectFile(fileData, metaData, contextMap))
- .retryBackoff(maxNumberOfRetries, firstBackoffTimeout);
- //@formatter:on
+ return Mono.just(fileData) //
+ .cache() //
+ .flatMap(fd -> collectFile(fileData, metaData, contextMap)) //
+ .retryBackoff(maxNumberOfRetries, firstBackoffTimeout);
}
private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData,
@@ -92,22 +92,20 @@ public class FileCollector {
private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, MessageMetaData metaData, Path localFile) {
String location = fileData.location();
- // @formatter:off
- return ImmutableConsumerDmaapModel.builder()
- .productName(metaData.productName())
- .vendorName(metaData.vendorName())
- .lastEpochMicrosec(metaData.lastEpochMicrosec())
- .sourceName(metaData.sourceName())
- .startEpochMicrosec(metaData.startEpochMicrosec())
- .timeZoneOffset(metaData.timeZoneOffset())
- .name(fileData.name())
- .location(location)
- .internalLocation(localFile.toString())
- .compression(fileData.compression())
- .fileFormatType(fileData.fileFormatType())
- .fileFormatVersion(fileData.fileFormatVersion())
+ return ImmutableConsumerDmaapModel.builder() //
+ .productName(metaData.productName()) //
+ .vendorName(metaData.vendorName()) //
+ .lastEpochMicrosec(metaData.lastEpochMicrosec()) //
+ .sourceName(metaData.sourceName()) //
+ .startEpochMicrosec(metaData.startEpochMicrosec()) //
+ .timeZoneOffset(metaData.timeZoneOffset()) //
+ .name(fileData.name()) //
+ .location(location) //
+ .internalLocation(localFile) //
+ .compression(fileData.compression()) //
+ .fileFormatType(fileData.fileFormatType()) //
+ .fileFormatVersion(fileData.fileFormatVersion()) //
.build();
- // @formatter:on
}
SftpClient createSftpClient(FileData fileData) throws DatafileTaskException {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
new file mode 100644
index 00000000..0729caa0
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
@@ -0,0 +1,119 @@
+/*-
+* ============LICENSE_START=======================================================
+* Copyright (C) 2019 Nordix Foundation.
+* ================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+* SPDX-License-Identifier: Apache-2.0
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpGet;
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
+import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+/**
+ * Bean used to check with DataRouter if a file has been published.
+ *
+ * @author <a href="mailto:maxime.bonneau@est.tech">Maxime Bonneau</a>
+ *
+ */
+public class PublishedChecker {
+ private static final String FEEDLOG_TOPIC = "feedlog";
+ private static final String DEFAULT_FEED_ID = "1";
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private AppConfig appConfig;
+
+ /**
+ * Constructor.
+ *
+ * @param appConfig The DFC configuration.
+ */
+ public PublishedChecker(AppConfig appConfig) {
+ this.appConfig = appConfig;
+ }
+
+ /**
+ * Checks with DataRouter if the given file has been published already.
+ *
+ * @param fileName the name of the file used when it is published.
+ *
+ * @return <code>true</code> if the file has been published before, <code>false</code> otherwise.
+ */
+ public boolean execute(String fileName, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
+ DmaapProducerReactiveHttpClient producerClient = resolveClient();
+
+ HttpGet getRequest = new HttpGet();
+ String requestId = MDC.get(REQUEST_ID);
+ getRequest.addHeader(X_ONAP_REQUEST_ID, requestId);
+ String invocationId = UUID.randomUUID().toString();
+ getRequest.addHeader(X_INVOCATION_ID, invocationId);
+ getRequest.setURI(getPublishedQueryUri(fileName, producerClient));
+ producerClient.addUserCredentialsToHead(getRequest);
+
+ try {
+ HttpResponse response =
+ producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest, 2000, contextMap);
+
+ logger.trace(response.toString());
+ int status = response.getStatusLine().getStatusCode();
+ HttpEntity entity = response.getEntity();
+ InputStream content = entity.getContent();
+ String body = IOUtils.toString(content);
+ return HttpStatus.SC_OK == status && !"[]".equals(body);
+ } catch (Exception e) {
+ logger.warn("Unable to check if file has been published.", e);
+ return false;
+ }
+ }
+
+ private URI getPublishedQueryUri(String fileName, DmaapProducerReactiveHttpClient producerClient) {
+ return producerClient.getBaseUri() //
+ .pathSegment(FEEDLOG_TOPIC) //
+ .pathSegment(DEFAULT_FEED_ID) //
+ .queryParam("type", "pub") //
+ .queryParam("filename", fileName) //
+ .build();
+ }
+
+ protected DmaapPublisherConfiguration resolveConfiguration() {
+ return appConfig.getDmaapPublisherConfiguration();
+ }
+
+ protected DmaapProducerReactiveHttpClient resolveClient() {
+ return new DmaapProducerReactiveHttpClient(resolveConfiguration());
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 28963377..d41e5c25 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -18,13 +18,13 @@ package org.onap.dcaegen2.collectors.datafile.tasks;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
@@ -37,14 +37,15 @@ import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
/**
- * This implements the main flow of the data file collector. Fetch file ready events from the
- * message router, fetch new files from the PNF publish these in the data router.
+ * This implements the main flow of the data file collector. Fetch file ready events from the message router, fetch new
+ * files from the PNF publish these in the data router.
*/
@Component
public class ScheduledTasks {
@@ -52,7 +53,7 @@ public class ScheduledTasks {
private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200;
private static final int MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS = 10;
- /** Data needed for fetching of one file */
+ /** Data needed for fetching of one file. */
private class FileCollectionData {
final FileData fileData;
final MessageMetaData metaData;
@@ -64,6 +65,7 @@ public class ScheduledTasks {
}
private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
+
private final AppConfig applicationConfiguration;
private final AtomicInteger currentNumberOfTasks = new AtomicInteger();
private final Scheduler scheduler =
@@ -96,17 +98,17 @@ public class ScheduledTasks {
.parallel(getParallelism()) // Each FileReadyMessage in a separate thread
.runOn(scheduler) //
.flatMap(this::createFileCollectionTask) //
- .filter(this::shouldBePublished) //
+ .filter(fileData -> shouldBePublished(fileData, contextMap)) //
.doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
.flatMap(fileData -> collectFileFromXnf(fileData, contextMap)) //
.flatMap(model -> publishToDataRouter(model, contextMap)) //
- .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()), contextMap)) //
+ .doOnNext(model -> deleteFile(model.getInternalLocation(), contextMap)) //
.doOnNext(model -> currentNumberOfTasks.decrementAndGet()) //
.sequential();
}
/**
- * called in regular intervals to remove out-dated cached information
+ * called in regular intervals to remove out-dated cached information.
*/
public void purgeCachedInformation(Instant now) {
alreadyPublishedFiles.purge(now);
@@ -144,8 +146,13 @@ public class ScheduledTasks {
return Flux.fromIterable(fileCollects);
}
- private boolean shouldBePublished(FileCollectionData task) {
- return alreadyPublishedFiles.put(task.fileData.getLocalFileName()) == null;
+ private boolean shouldBePublished(FileCollectionData task, Map<String, String> contextMap) {
+ boolean result = false;
+ Path localFileName = task.fileData.getLocalFileName();
+ if (alreadyPublishedFiles.put(localFileName) == null) {
+ result = !createPublishedChecker().execute(localFileName.getFileName().toString(), contextMap);
+ }
+ return result;
}
private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect,
@@ -156,7 +163,7 @@ public class ScheduledTasks {
MdcVariables.setMdcContextMap(contextMap);
return createFileCollector()
.execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout,
- contextMap)
+ contextMap)
.onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, contextMap));
}
@@ -174,10 +181,9 @@ public class ScheduledTasks {
final long maxNumberOfRetries = 3;
final Duration initialRetryTimeout = Duration.ofSeconds(5);
- DataRouterPublisher publisherTask = createDataRouterPublisher();
MdcVariables.setMdcContextMap(contextMap);
- return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout, contextMap)
+ return createDataRouterPublisher().execute(model, maxNumberOfRetries, initialRetryTimeout, contextMap)
.onErrorResume(exception -> handlePublishFailure(model, exception, contextMap));
}
@@ -185,7 +191,7 @@ public class ScheduledTasks {
Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
logger.error("File publishing failed: {}, exception: {}", model.getName(), exception);
- Path internalFileName = Paths.get(model.getInternalLocation());
+ Path internalFileName = model.getInternalLocation();
deleteFile(internalFileName, contextMap);
alreadyPublishedFiles.remove(internalFileName);
currentNumberOfTasks.decrementAndGet();
@@ -223,6 +229,10 @@ public class ScheduledTasks {
}
}
+ PublishedChecker createPublishedChecker() {
+ return new PublishedChecker(applicationConfiguration);
+ }
+
int getCurrentNumberOfTasks() {
return currentNumberOfTasks.get();
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
index 2c136304..2c136304 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
index acae1e6e..b67fac23 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
@@ -28,56 +28,49 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.Immutabl
class CloudConfigParserTest {
-
- private static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG =
- //@formatter:on
- new ImmutableDmaapConsumerConfiguration.Builder()
- .timeoutMs(-1)
- .dmaapHostName("message-router.onap.svc.cluster.local")
- .dmaapUserName("admin")
- .dmaapUserPassword("admin")
- .dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT")
- .dmaapPortNumber(2222)
- .dmaapContentType("application/json")
- .messageLimit(-1)
- .dmaapProtocol("http")
- .consumerId("C12")
- .consumerGroup("OpenDCAE-c12")
- .trustStorePath("trustStorePath")
- .trustStorePasswordPath("trustStorePasswordPath")
- .keyStorePath("keyStorePath")
- .keyStorePasswordPath("keyStorePasswordPath")
- .enableDmaapCertAuth(true)
+ private static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = //
+ new ImmutableDmaapConsumerConfiguration.Builder() //
+ .timeoutMs(-1) //
+ .dmaapHostName("message-router.onap.svc.cluster.local") //
+ .dmaapUserName("admin") //
+ .dmaapUserPassword("admin") //
+ .dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT") //
+ .dmaapPortNumber(2222) //
+ .dmaapContentType("application/json") //
+ .messageLimit(-1) //
+ .dmaapProtocol("http") //
+ .consumerId("C12") //
+ .consumerGroup("OpenDCAE-c12") //
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
.build();
- //@formatter:off
-
- private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG =
- //@formatter:on
- new ImmutableDmaapPublisherConfiguration.Builder()
- .dmaapTopicName("publish")
- .dmaapUserPassword("dradmin")
- .dmaapPortNumber(3907)
- .dmaapProtocol("https")
- .dmaapContentType("application/json")
- .dmaapHostName("message-router.onap.svc.cluster.local")
- .dmaapUserName("dradmin")
- .trustStorePath("trustStorePath")
- .trustStorePasswordPath("trustStorePasswordPath")
- .keyStorePath("keyStorePath")
- .keyStorePasswordPath("keyStorePasswordPath")
- .enableDmaapCertAuth(true)
+
+ private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = //
+ new ImmutableDmaapPublisherConfiguration.Builder() //
+ .dmaapTopicName("publish") //
+ .dmaapUserPassword("dradmin") //
+ .dmaapPortNumber(3907) //
+ .dmaapProtocol("https") //
+ .dmaapContentType("application/json") //
+ .dmaapHostName("message-router.onap.svc.cluster.local") //
+ .dmaapUserName("dradmin") //
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
.build();
- //@formatter:off
-
- private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION =
- //@formatter:on
- new ImmutableFtpesConfig.Builder()
- .keyCert("/config/ftpKey.jks")
- .keyPassword("secret")
- .trustedCA("config/cacerts")
- .trustedCAPassword("secret")
+
+ private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION = //
+ new ImmutableFtpesConfig.Builder() //
+ .keyCert("/config/ftpKey.jks") //
+ .keyPassword("secret") //
+ .trustedCA("config/cacerts") //
+ .trustedCAPassword("secret") //
.build();
- //@formatter:off
private CloudConfigParser cloudConfigParser = new CloudConfigParser(getCloudConfigJsonObject());
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
index f7b83297..b33180fa 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
@@ -68,48 +68,47 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJson_oneFileReadyMessage() throws DmaapNotFoundException {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
- .build();
-
- MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .build();
- FileData expectedFileData = ImmutableFileData.builder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .scheme(Scheme.FTPS)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
+ .build();
+
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .build();
+ FileData expectedFileData = ImmutableFileData.builder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .scheme(Scheme.FTPS) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
List<FileData> files = new ArrayList<>();
files.add(expectedFileData);
FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
- .pnfName(SOURCE_NAME)
- .messageMetaData(messageMetaData)
- .files(files)
+ .pnfName(SOURCE_NAME) //
+ .messageMetaData(messageMetaData) //
+ .files(files) //
.build();
- // @formatter:on
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -123,48 +122,47 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithTwoEvents_twoMessages() throws DmaapNotFoundException {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
- .build();
-
- MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .build();
- FileData expectedFileData = ImmutableFileData.builder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .scheme(Scheme.FTPS)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
+ .build();
+
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .build();
+ FileData expectedFileData = ImmutableFileData.builder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .scheme(Scheme.FTPS) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
List<FileData> files = new ArrayList<>();
files.add(expectedFileData);
- FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
- .pnfName(SOURCE_NAME)
- .messageMetaData(messageMetaData)
- .files(files)
+ FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() //
+ .pnfName(SOURCE_NAME) //
+ .messageMetaData(messageMetaData) //
+ .files(files) //
.build();
- // @formatter:on
+
String parsedString = message.getParsed();
String messageString = "[" + parsedString + "," + parsedString + "]";
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -178,21 +176,20 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithoutLocation_noMessage() {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
.build();
- // @formatter:on
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -206,48 +203,47 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() throws DmaapNotFoundException {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
- .build();
-
- MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .build();
- FileData expectedFileData = ImmutableFileData.builder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .scheme(Scheme.FTPS)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
+ .build();
+
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .build();
+ FileData expectedFileData = ImmutableFileData.builder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .scheme(Scheme.FTPS) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
List<FileData> files = new ArrayList<>();
files.add(expectedFileData);
- FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
- .pnfName(SOURCE_NAME)
- .messageMetaData(messageMetaData)
- .files(files)
+ FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() //
+ .pnfName(SOURCE_NAME) //
+ .messageMetaData(messageMetaData) //
+ .files(files) //
.build();
- // @formatter:on
+
String parsedString = message.getParsed();
String messageString = "[{\"event\":{}}," + parsedString + "]";
JsonMessageParser jsonMessageParserUnderTest = new JsonMessageParser();
@@ -258,21 +254,20 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithFaultyEventName_noFileData() {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName("Faulty event name")
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName("Faulty event name") //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
.build();
- // @formatter:on
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -286,21 +281,20 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithoutName_noFileData() {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
.build();
- // @formatter:on
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -314,14 +308,13 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithoutAdditionalFields_noFileData() {
- // @formatter:off
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .build();
- // @formatter:on
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .build();
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -335,21 +328,20 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithoutCompression_noFileData() {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
- .build();
- // @formatter:on
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
+ .build();
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -363,21 +355,20 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithoutFileFormatType_noFileData() {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
.build();
- // @formatter:on
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -391,55 +382,54 @@ class JsonMessageParserTest {
@Test
void whenPassingOneCorrectJsonWithoutFileFormatVersionAndOneCorrect_oneFileData() {
- // @formatter:off
- AdditionalField additionalFaultyField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .build();
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalFaultyField)
- .addAdditionalField(additionalField)
- .build();
-
- MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .build();
- FileData expectedFileData = ImmutableFileData.builder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .scheme(Scheme.FTPS)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalFaultyField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .build();
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalFaultyField) //
+ .addAdditionalField(additionalField) //
+ .build();
+
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .build();
+ FileData expectedFileData = ImmutableFileData.builder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .scheme(Scheme.FTPS) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
List<FileData> files = new ArrayList<>();
files.add(expectedFileData);
- FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
- .pnfName(SOURCE_NAME)
- .messageMetaData(messageMetaData)
- .files(files)
+ FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() //
+ .pnfName(SOURCE_NAME) //
+ .messageMetaData(messageMetaData) //
+ .files(files) //
.build();
- // @formatter:on
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -453,14 +443,13 @@ class JsonMessageParserTest {
@Test
void whenPassingJsonWithoutMandatoryHeaderInformation_noFileData() {
- // @formatter:off
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier("PM_MEAS_FILES_INVALID")
- .changeType("FileReady_INVALID")
- .notificationFieldsVersion("1.0_INVALID")
- .build();
- // @formatter:on
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier("PM_MEAS_FILES_INVALID") //
+ .changeType("FileReady_INVALID") //
+ .notificationFieldsVersion("1.0_INVALID") //
+ .build();
+
String incorrectMessageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -486,21 +475,20 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithIncorrectChangeType_noFileData() {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
.compression(GZIP_COMPRESSION)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(INCORRECT_CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(INCORRECT_CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
.build();
- // @formatter:on
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -514,21 +502,20 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithIncorrectChangeIdentifier_noFileData() {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(INCORRECT_CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(INCORRECT_CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
.build();
- // @formatter:on
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
index f88e301d..a695e20d 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
@@ -28,6 +28,8 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
@@ -74,7 +76,7 @@ public class DMaaPMessageConsumerTaskImplTest {
private static final String PORT_22 = "22";
private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME;
- private static final String LOCAL_FILE_LOCATION = "target/" + PM_FILE_NAME;
+ private static final Path LOCAL_FILE_LOCATION = Paths.get("target/" + PM_FILE_NAME);
private static final String FTPES_LOCATION = FTPES_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
private static final String GZIP_COMPRESSION = "gzip";
@@ -86,7 +88,7 @@ public class DMaaPMessageConsumerTaskImplTest {
private static AppConfig appConfig;
private static DmaapConsumerConfiguration dmaapConsumerConfiguration;
private DMaaPMessageConsumerTask messageConsumerTask;
- private DMaaPConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient;
+ private DMaaPConsumerReactiveHttpClient httpClientMock;
private static String ftpesMessageString;
private static FileData ftpesFileData;
@@ -96,156 +98,163 @@ public class DMaaPMessageConsumerTaskImplTest {
private static FileData sftpFileData;
private static FileReadyMessage expectedSftpMessage;
+ /**
+ * Sets up data for the test.
+ */
@BeforeAll
public static void setUp() {
- //@formatter:off
- dmaapConsumerConfiguration = new ImmutableDmaapConsumerConfiguration.Builder()
- .consumerGroup("OpenDCAE-c12")
- .consumerId("c12")
- .dmaapContentType("application/json")
- .dmaapHostName("54.45.33.2")
- .dmaapPortNumber(1234).dmaapProtocol("https")
- .dmaapUserName("Datafile")
- .dmaapUserPassword("Datafile")
- .dmaapTopicName("unauthenticated.NOTIFICATION")
- .timeoutMs(-1)
- .messageLimit(-1)
- .trustStorePath("trustStorePath")
- .trustStorePasswordPath("trustStorePasswordPath")
- .keyStorePath("keyStorePath")
- .keyStorePasswordPath("keyStorePasswordPath")
- .enableDmaapCertAuth(true)
+ dmaapConsumerConfiguration = new ImmutableDmaapConsumerConfiguration.Builder() //
+ .consumerGroup("OpenDCAE-c12") //
+ .consumerId("c12") //
+ .dmaapContentType("application/json") //
+ .dmaapHostName("54.45.33.2") //
+ .dmaapPortNumber(1234).dmaapProtocol("https") //
+ .dmaapUserName("Datafile") //
+ .dmaapUserPassword("Datafile") //
+ .dmaapTopicName("unauthenticated.NOTIFICATION") //
+ .timeoutMs(-1) //
+ .messageLimit(-1) //
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
.build();
appConfig = mock(AppConfig.class);
- AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder()
- .location(FTPES_LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .location(FTPES_LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage ftpesJsonMessage = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
- .changeType(FILE_READY_CHANGE_TYPE)
- .notificationFieldsVersion("1.0")
- .addAdditionalField(ftpesAdditionalField)
+ JsonMessage ftpesJsonMessage = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) //
+ .changeType(FILE_READY_CHANGE_TYPE) //
+ .notificationFieldsVersion("1.0") //
+ .addAdditionalField(ftpesAdditionalField) //
.build();
ftpesMessageString = ftpesJsonMessage.toString();
- MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
- .changeType(FILE_READY_CHANGE_TYPE)
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) //
+ .changeType(FILE_READY_CHANGE_TYPE) //
.build();
- ftpesFileData = ImmutableFileData.builder()
- .name(PM_FILE_NAME)
- .location(FTPES_LOCATION)
- .scheme(Scheme.FTPS)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ ftpesFileData = ImmutableFileData.builder() //
+ .name(PM_FILE_NAME) //
+ .location(FTPES_LOCATION) //
+ .scheme(Scheme.FTPS) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
List<FileData> files = new ArrayList<>();
files.add(ftpesFileData);
- expectedFtpesMessage = ImmutableFileReadyMessage.builder()
- .pnfName(SOURCE_NAME)
- .messageMetaData(messageMetaData)
- .files(files)
+ expectedFtpesMessage = ImmutableFileReadyMessage.builder() //
+ .pnfName(SOURCE_NAME) //
+ .messageMetaData(messageMetaData) //
+ .files(files) //
.build();
- AdditionalField sftpAdditionalField = new JsonMessage.AdditionalFieldBuilder()
- .location(SFTP_LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField sftpAdditionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .location(SFTP_LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage sftpJsonMessage = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
- .changeType(FILE_READY_CHANGE_TYPE)
- .notificationFieldsVersion("1.0")
- .addAdditionalField(sftpAdditionalField)
+ JsonMessage sftpJsonMessage = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) //
+ .changeType(FILE_READY_CHANGE_TYPE) //
+ .notificationFieldsVersion("1.0") //
+ .addAdditionalField(sftpAdditionalField) //
.build();
sftpMessageString = sftpJsonMessage.toString();
- sftpFileData = ImmutableFileData.builder()
- .name(PM_FILE_NAME)
- .location(SFTP_LOCATION)
- .scheme(Scheme.FTPS)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ sftpFileData = ImmutableFileData.builder() //
+ .name(PM_FILE_NAME) //
+ .location(SFTP_LOCATION) //
+ .scheme(Scheme.FTPS) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .name(PM_FILE_NAME)
- .location(FTPES_LOCATION)
- .internalLocation(LOCAL_FILE_LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .name(PM_FILE_NAME) //
+ .location(FTPES_LOCATION) //
+ .internalLocation(LOCAL_FILE_LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
listOfConsumerDmaapModel.add(consumerDmaapModel);
files = new ArrayList<>();
files.add(sftpFileData);
- expectedSftpMessage = ImmutableFileReadyMessage.builder()
- .pnfName(SOURCE_NAME)
- .messageMetaData(messageMetaData)
- .files(files)
+ expectedSftpMessage = ImmutableFileReadyMessage.builder() //
+ .pnfName(SOURCE_NAME) //
+ .messageMetaData(messageMetaData) //
+ .files(files) //
.build();
- //@formatter:on
}
@Test
public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() {
prepareMocksForDmaapConsumer("", null);
- StepVerifier.create(messageConsumerTask.execute()).expectSubscription()
- .expectError(DatafileTaskException.class).verify();
+ StepVerifier.create(messageConsumerTask.execute()) //
+ .expectSubscription() //
+ .expectError(DatafileTaskException.class) //
+ .verify();
- verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
+ verify(httpClientMock, times(1)).getDMaaPConsumerResponse();
}
@Test
public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException {
prepareMocksForDmaapConsumer(ftpesMessageString, expectedFtpesMessage);
- StepVerifier.create(messageConsumerTask.execute()).expectNext(expectedFtpesMessage).verifyComplete();
+ StepVerifier.create(messageConsumerTask.execute()) //
+ .expectNext(expectedFtpesMessage) //
+ .verifyComplete();
- verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
- verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
+ verify(httpClientMock, times(1)).getDMaaPConsumerResponse();
+ verifyNoMoreInteractions(httpClientMock);
}
@Test
public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException {
prepareMocksForDmaapConsumer(sftpMessageString, expectedSftpMessage);
- StepVerifier.create(messageConsumerTask.execute()).expectNext(expectedSftpMessage).verifyComplete();
+ StepVerifier.create(messageConsumerTask.execute()) //
+ .expectNext(expectedSftpMessage) //
+ .verifyComplete();
- verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
- verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
+ verify(httpClientMock, times(1)).getDMaaPConsumerResponse();
+ verifyNoMoreInteractions(httpClientMock);
}
private void prepareMocksForDmaapConsumer(String message, FileReadyMessage fileReadyMessageAfterConsume) {
Mono<String> messageAsMono = Mono.just(message);
JsonMessageParser jsonMessageParserMock = mock(JsonMessageParser.class);
- dmaapConsumerReactiveHttpClient = mock(DMaaPConsumerReactiveHttpClient.class);
- when(dmaapConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(messageAsMono);
+ httpClientMock = mock(DMaaPConsumerReactiveHttpClient.class);
+ when(httpClientMock.getDMaaPConsumerResponse()).thenReturn(messageAsMono);
if (!message.isEmpty()) {
when(jsonMessageParserMock.getMessagesFromJson(messageAsMono))
@@ -255,9 +264,8 @@ public class DMaaPMessageConsumerTaskImplTest {
.thenReturn(Flux.error(new DatafileTaskException("problemas")));
}
- messageConsumerTask =
- spy(new DMaaPMessageConsumerTask(appConfig, dmaapConsumerReactiveHttpClient, jsonMessageParserMock));
+ messageConsumerTask = spy(new DMaaPMessageConsumerTask(appConfig, httpClientMock, jsonMessageParserMock));
when(messageConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration);
- doReturn(dmaapConsumerReactiveHttpClient).when(messageConsumerTask).resolveClient();
+ doReturn(httpClientMock).when(messageConsumerTask).resolveClient();
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
index d612d17c..ed8b93f1 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -16,8 +16,9 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -25,19 +26,36 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+
+import org.apache.http.Header;
+import org.apache.http.HttpResponse;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
import org.springframework.http.HttpStatus;
-import reactor.core.publisher.Mono;
+import org.springframework.web.util.DefaultUriBuilderFactory;
+import org.springframework.web.util.UriBuilder;
+
import reactor.test.StepVerifier;
/**
@@ -52,31 +70,41 @@ class DataRouterPublisherTest {
private static final String START_EPOCH_MICROSEC = "8745745764578";
private static final String TIME_ZONE_OFFSET = "UTC+05:00";
private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
+ private static final String FTPES_ADDRESS = "ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME;
+ private static final String LOCAL_FILE_NAME = SOURCE_NAME + "_" + PM_FILE_NAME;
+
+ private static final String COMPRESSION = "gzip";
+ private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
+ private static final String FILE_FORMAT_VERSION = "V10";
+ private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
+
+ private static final String HOST = "54.45.33.2";
+ private static final String HTTPS_SCHEME = "https";
+ private static final int PORT = 1234;
+ private static final String APPLICATION_OCTET_STREAM_CONTENT_TYPE = "application/octet-stream";
+ private static final String PUBLISH_TOPIC = "publish";
+ private static final String FEED_ID = "1";
+ private static final String FILE_CONTENT = "Just a string.";
+
+ private static final Map<String, String> CONTEXT_MAP = new HashMap<>();
private static ConsumerDmaapModel consumerDmaapModel;
- private static DataRouterPublisher dmaapPublisherTask;
- private static DmaapProducerReactiveHttpClient dMaaPProducerReactiveHttpClient;
+ private static DmaapProducerReactiveHttpClient httpClientMock;
private static AppConfig appConfig;
- private static DmaapPublisherConfiguration dmaapPublisherConfiguration;
+ private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
+ private static DataRouterPublisher publisherTaskUnderTestSpy;
+
+ /**
+ * Sets up data for tests.
+ */
@BeforeAll
public static void setUp() {
+ when(publisherConfigurationMock.dmaapHostName()).thenReturn(HOST);
+ when(publisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME);
+ when(publisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT);
- dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder()
- .dmaapContentType("application/json") //
- .dmaapHostName("54.45.33.2") //
- .dmaapPortNumber(1234) //
- .dmaapProtocol("https") //
- .dmaapUserName("DFC") //
- .dmaapUserPassword("DFC") //
- .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") //
- .trustStorePath("trustStorePath") //
- .trustStorePasswordPath("trustStorePasswordPath") //
- .keyStorePath("keyStorePath") //
- .keyStorePasswordPath("keyStorePasswordPath") //
- .enableDmaapCertAuth(true) //
- .build(); //
- consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
+ consumerDmaapModel = ImmutableConsumerDmaapModel.builder() //
.productName(PRODUCT_NAME) //
.vendorName(VENDOR_NAME) //
.lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
@@ -84,61 +112,145 @@ class DataRouterPublisherTest {
.startEpochMicrosec(START_EPOCH_MICROSEC) //
.timeZoneOffset(TIME_ZONE_OFFSET) //
.name(PM_FILE_NAME) //
- .location("ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME) //
- .internalLocation("target/" + PM_FILE_NAME) //
+ .location(FTPES_ADDRESS) //
+ .internalLocation(Paths.get("target/" + LOCAL_FILE_NAME)) //
.compression("gzip") //
- .fileFormatType("org.3GPP.32.435#measCollec") //
- .fileFormatVersion("V10") //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build(); //
appConfig = mock(AppConfig.class);
-
- doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
+ publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig));
}
@Test
- public void whenPassedObjectFits_ReturnsCorrectStatus() {
- prepareMocksForTests(Mono.just(HttpStatus.OK));
+ public void whenPassedObjectFits_ReturnsCorrectStatus() throws Exception {
+ prepareMocksForTests(null, Integer.valueOf(HttpStatus.OK.value()));
+
+ StepVerifier
+ .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), CONTEXT_MAP))
+ .expectNext(consumerDmaapModel) //
+ .verifyComplete();
+
+ ArgumentCaptor<HttpUriRequest> requestCaptor = ArgumentCaptor.forClass(HttpUriRequest.class);
+ verify(httpClientMock).getBaseUri();
+ verify(httpClientMock).addUserCredentialsToHead(any(HttpUriRequest.class));
+ verify(httpClientMock).getDmaapProducerResponseWithRedirect(requestCaptor.capture(), any());
+ verifyNoMoreInteractions(httpClientMock);
- Map<String, String> contextMap = new HashMap<>();
- StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
- .expectNext(consumerDmaapModel).verifyComplete();
+ HttpPut actualPut = (HttpPut) requestCaptor.getValue();
+ URI actualUri = actualPut.getURI();
+ assertEquals(HTTPS_SCHEME, actualUri.getScheme());
+ assertEquals(HOST, actualUri.getHost());
+ assertEquals(PORT, actualUri.getPort());
+ Path actualPath = Paths.get(actualUri.getPath());
+ assertTrue(PUBLISH_TOPIC.equals(actualPath.getName(0).toString()));
+ assertTrue(FEED_ID.equals(actualPath.getName(1).toString()));
+ assertTrue(LOCAL_FILE_NAME.equals(actualPath.getName(2).toString()));
- verify(dMaaPProducerReactiveHttpClient, times(1)).getDmaapProducerResponse(any(), eq(contextMap));
- verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
+ Header[] contentHeaders = actualPut.getHeaders("content-type");
+ assertEquals(APPLICATION_OCTET_STREAM_CONTENT_TYPE, contentHeaders[0].getValue());
+
+ Header[] metaHeaders = actualPut.getHeaders(X_DMAAP_DR_META);
+ Map<String, String> metaHash = getMetaDataAsMap(metaHeaders);
+ assertTrue(10 == metaHash.size());
+ assertEquals(PRODUCT_NAME, metaHash.get("productName"));
+ assertEquals(VENDOR_NAME, metaHash.get("vendorName"));
+ assertEquals(LAST_EPOCH_MICROSEC, metaHash.get("lastEpochMicrosec"));
+ assertEquals(SOURCE_NAME, metaHash.get("sourceName"));
+ assertEquals(START_EPOCH_MICROSEC, metaHash.get("startEpochMicrosec"));
+ assertEquals(TIME_ZONE_OFFSET, metaHash.get("timeZoneOffset"));
+ assertEquals(COMPRESSION, metaHash.get("compression"));
+ assertEquals(FTPES_ADDRESS, metaHash.get("location"));
+ assertEquals(FILE_FORMAT_TYPE, metaHash.get("fileFormatType"));
+ assertEquals(FILE_FORMAT_VERSION, metaHash.get("fileFormatVersion"));
+ }
+
+ @Test
+ void whenPassedObjectFits_firstFailsWithExceptionThenSucceeds() throws Exception {
+ prepareMocksForTests(new DatafileTaskException("Error"), HttpStatus.OK.value());
+
+ StepVerifier
+ .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 2, Duration.ofSeconds(0), CONTEXT_MAP))
+ .expectNext(consumerDmaapModel) //
+ .verifyComplete();
}
@Test
- public void whenPassedObjectFits_firstFailsThenSucceeds() {
- prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.OK));
+ public void whenPassedObjectFits_firstFailsThenSucceeds() throws Exception {
+ prepareMocksForTests(null, Integer.valueOf(HttpStatus.BAD_GATEWAY.value()),
+ Integer.valueOf(HttpStatus.OK.value()));
- Map<String, String> contextMap = new HashMap<>();
- StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
- .expectNext(consumerDmaapModel).verifyComplete();
+ StepVerifier
+ .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), CONTEXT_MAP))
+ .expectNext(consumerDmaapModel) //
+ .verifyComplete();
- verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any(), eq(contextMap));
- verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
+ verify(httpClientMock, times(2)).getBaseUri();
+ verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class));
+ verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any());
+ verifyNoMoreInteractions(httpClientMock);
}
@Test
- public void whenPassedObjectFits_firstFailsThenFails() {
- prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.BAD_GATEWAY));
+ public void whenPassedObjectFits_firstFailsThenFails() throws Exception {
+ prepareMocksForTests(null, Integer.valueOf(HttpStatus.BAD_GATEWAY.value()),
+ Integer.valueOf((HttpStatus.BAD_GATEWAY.value())));
- Map<String, String> contextMap = new HashMap<>();
- StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
- .expectErrorMessage("Retries exhausted: 1/1").verify();
+ StepVerifier
+ .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), CONTEXT_MAP))
+ .expectErrorMessage("Retries exhausted: 1/1") //
+ .verify();
- verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any(), eq(contextMap));
- verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
+ verify(httpClientMock, times(2)).getBaseUri();
+ verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class));
+ verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any());
+ verifyNoMoreInteractions(httpClientMock);
}
@SafeVarargs
- final void prepareMocksForTests(Mono<HttpStatus> firstResponse, Mono<HttpStatus>... nextHttpResponses) {
- dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class);
- when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any(), any())).thenReturn(firstResponse,
- nextHttpResponses);
-
- dmaapPublisherTask = spy(new DataRouterPublisher(appConfig));
- when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration);
- doReturn(dMaaPProducerReactiveHttpClient).when(dmaapPublisherTask).resolveClient();
+ final void prepareMocksForTests(Exception exception, Integer firstResponse, Integer... nextHttpResponses)
+ throws Exception {
+ httpClientMock = mock(DmaapProducerReactiveHttpClient.class);
+ when(appConfig.getDmaapPublisherConfiguration()).thenReturn(publisherConfigurationMock);
+ doReturn(publisherConfigurationMock).when(publisherTaskUnderTestSpy).resolveConfiguration();
+ doReturn(httpClientMock).when(publisherTaskUnderTestSpy).resolveClient();
+
+ UriBuilder uriBuilder = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT);
+ when(httpClientMock.getBaseUri()).thenReturn(uriBuilder);
+
+ HttpResponse httpResponseMock = mock(HttpResponse.class);
+ if (exception == null) {
+ when(httpClientMock.getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any()))
+ .thenReturn(httpResponseMock);
+ } else {
+ when(httpClientMock.getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any()))
+ .thenThrow(exception).thenReturn(httpResponseMock);
+ }
+ StatusLine statusLineMock = mock(StatusLine.class);
+ when(httpResponseMock.getStatusLine()).thenReturn(statusLineMock);
+ when(statusLineMock.getStatusCode()).thenReturn(firstResponse, nextHttpResponses);
+
+ InputStream fileStream = new ByteArrayInputStream(FILE_CONTENT.getBytes());
+ doReturn(fileStream).when(publisherTaskUnderTestSpy).createInputStream(Paths.get("target", LOCAL_FILE_NAME));
+ }
+
+ private Map<String, String> getMetaDataAsMap(Header[] metaHeaders) {
+ Map<String, String> metaHash = new HashMap<>();
+ String actualMetaData = metaHeaders[0].getValue();
+ actualMetaData = actualMetaData.substring(1, actualMetaData.length() - 1);
+ actualMetaData = actualMetaData.replace("\"", "");
+ String[] commaSplitedMetaData = actualMetaData.split(",");
+ for (int i = 0; i < commaSplitedMetaData.length; i++) {
+ String[] keyValuePair = commaSplitedMetaData[i].split(":");
+ if (keyValuePair.length > 2) {
+ List<String> arrayKeyValuePair = new ArrayList<>(keyValuePair.length);
+ for (int j = 1; j < keyValuePair.length; j++) {
+ arrayKeyValuePair.add(keyValuePair[j]);
+ }
+ keyValuePair[1] = String.join(":", arrayKeyValuePair);
+ }
+ metaHash.put(keyValuePair[0], keyValuePair[1]);
+ }
+ return metaHash;
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
index c266d50e..fb49c860 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -134,7 +134,7 @@ public class FileCollectorTest {
.timeZoneOffset(TIME_ZONE_OFFSET)
.name(PM_FILE_NAME)
.location(location)
- .internalLocation(LOCAL_FILE_LOCATION.toString())
+ .internalLocation(LOCAL_FILE_LOCATION)
.compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
@@ -161,7 +161,8 @@ public class FileCollectorTest {
ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT);
Map<String, String> contextMap = new HashMap<>();
- StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+ StepVerifier.create(
+ collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
.expectNext(expectedConsumerDmaapModel).verifyComplete();
verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
@@ -179,7 +180,9 @@ public class FileCollectorTest {
ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(SFTP_LOCATION_NO_PORT);
Map<String, String> contextMap = new HashMap<>();
- StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+ StepVerifier
+ .create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0),
+ contextMap))
.expectNext(expectedConsumerDmaapModel) //
.verifyComplete();
@@ -187,7 +190,9 @@ public class FileCollectorTest {
fileData = createFileData(SFTP_LOCATION, Scheme.SFTP);
expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(SFTP_LOCATION);
- StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+ StepVerifier
+ .create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0),
+ contextMap))
.expectNext(expectedConsumerDmaapModel) //
.verifyComplete();
@@ -206,7 +211,8 @@ public class FileCollectorTest {
.collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
Map<String, String> contextMap = new HashMap<>();
- StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+ StepVerifier.create(
+ collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
.expectErrorMessage("Retries exhausted: 3/3").verify();
verify(ftpsClientMock, times(4)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
@@ -223,10 +229,10 @@ public class FileCollectorTest {
FileData fileData = createFileData(FTPES_LOCATION_NO_PORT, Scheme.FTPS);
Map<String, String> contextMap = new HashMap<>();
- StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+ StepVerifier.create(
+ collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
.expectNext(expectedConsumerDmaapModel).verifyComplete();
verify(ftpsClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
}
-
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java
new file mode 100644
index 00000000..3e3c2ed6
--- /dev/null
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java
@@ -0,0 +1,175 @@
+/*-
+* ============LICENSE_START=======================================================
+* Copyright (C) 2019 Nordix Foundation.
+* ================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+* SPDX-License-Identifier: Apache-2.0
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
+import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.springframework.web.util.DefaultUriBuilderFactory;
+import org.springframework.web.util.UriBuilder;
+
+/**
+ * @author <a href="mailto:maxime.bonneau@est.tech">Maxime Bonneau</a>
+ *
+ */
+public class PublishedCheckerTest {
+ private static final String EMPTY_CONTENT = "[]";
+ private static final String FEEDLOG_TOPIC = "feedlog";
+ private static final String FEED_ID = "1";
+ private static final String HTTPS_SCHEME = "https";
+ private static final String HOST = "54.45.33.2";
+ private static final int PORT = 1234;
+ private static final String SOURCE_NAME = "oteNB5309";
+ private static final String FILE_NAME = "A20161224.1030-1045.bin.gz";
+ private static final String LOCAL_FILE_NAME = SOURCE_NAME + "_" + FILE_NAME;
+
+ private static final Map<String, String> CONTEXT_MAP = new HashMap<>();
+
+ private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
+ private static AppConfig appConfigMock;
+ private DmaapProducerReactiveHttpClient httpClientMock = mock(DmaapProducerReactiveHttpClient.class);
+
+ private PublishedChecker publishedCheckerUnderTestSpy;
+
+ /**
+ * Sets up data for the tests.
+ */
+ @BeforeAll
+ public static void setUp() {
+ when(publisherConfigurationMock.dmaapHostName()).thenReturn(HOST);
+ when(publisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME);
+ when(publisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT);
+
+ appConfigMock = mock(AppConfig.class);
+ when(appConfigMock.getDmaapPublisherConfiguration()).thenReturn(publisherConfigurationMock);
+ }
+
+ @Test
+ public void executeWhenNotPublished_returnsFalse() throws Exception {
+ prepareMocksForTests(HttpUtils.SC_OK, EMPTY_CONTENT, null);
+
+ boolean isPublished = publishedCheckerUnderTestSpy.execute(LOCAL_FILE_NAME, CONTEXT_MAP);
+
+ assertFalse(isPublished);
+
+ ArgumentCaptor<HttpUriRequest> requestCaptor = ArgumentCaptor.forClass(HttpUriRequest.class);
+ verify(httpClientMock).getBaseUri();
+ verify(httpClientMock).addUserCredentialsToHead(any(HttpUriRequest.class));
+ verify(httpClientMock).getDmaapProducerResponseWithCustomTimeout(requestCaptor.capture(), anyInt(), any());
+ verifyNoMoreInteractions(httpClientMock);
+
+ HttpUriRequest getRequest = requestCaptor.getValue();
+ assertTrue(getRequest instanceof HttpGet);
+ URI actualUri = getRequest.getURI();
+ assertEquals(HTTPS_SCHEME, actualUri.getScheme());
+ assertEquals(HOST, actualUri.getHost());
+ assertEquals(PORT, actualUri.getPort());
+ Path actualPath = Paths.get(actualUri.getPath());
+ assertTrue(FEEDLOG_TOPIC.equals(actualPath.getName(0).toString()));
+ assertTrue(FEED_ID.equals(actualPath.getName(1).toString()));
+ String actualQuery = actualUri.getQuery();
+ assertTrue(actualQuery.contains("type=pub"));
+ assertTrue(actualQuery.contains("filename=" + LOCAL_FILE_NAME));
+ }
+
+ @Test
+ public void executeWhenDataRouterReturnsNok_returnsFalse() throws Exception {
+ prepareMocksForTests(HttpUtils.SC_BAD_REQUEST, EMPTY_CONTENT, null);
+
+ boolean isPublished = publishedCheckerUnderTestSpy.execute(LOCAL_FILE_NAME, CONTEXT_MAP);
+
+ assertFalse(isPublished);
+ }
+
+ @Test
+ public void executeWhenPublished_returnsTrue() throws Exception {
+ prepareMocksForTests(HttpUtils.SC_OK, "[" + LOCAL_FILE_NAME + "]", null);
+
+ boolean isPublished = publishedCheckerUnderTestSpy.execute(LOCAL_FILE_NAME, CONTEXT_MAP);
+
+ assertTrue(isPublished);
+ }
+
+ @Test
+ public void executeWhenErrorInDataRouter_returnsFalse() throws Exception {
+ prepareMocksForTests(HttpUtils.SC_OK, EMPTY_CONTENT, new DatafileTaskException(""));
+
+ boolean isPublished = publishedCheckerUnderTestSpy.execute(LOCAL_FILE_NAME, CONTEXT_MAP);
+
+ assertFalse(isPublished);
+ }
+
+ final void prepareMocksForTests(int responseCode, String content, Exception exception) throws Exception {
+ publishedCheckerUnderTestSpy = spy(new PublishedChecker(appConfigMock));
+
+ doReturn(publisherConfigurationMock).when(publishedCheckerUnderTestSpy).resolveConfiguration();
+ doReturn(httpClientMock).when(publishedCheckerUnderTestSpy).resolveClient();
+
+ UriBuilder uriBuilder = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT);
+ when(httpClientMock.getBaseUri()).thenReturn(uriBuilder);
+
+ HttpResponse httpResponseMock = mock(HttpResponse.class);
+ if (exception == null) {
+ when(httpClientMock.getDmaapProducerResponseWithCustomTimeout(any(HttpUriRequest.class), anyInt(), any()))
+ .thenReturn(httpResponseMock);
+ } else {
+ when(httpClientMock.getDmaapProducerResponseWithCustomTimeout(any(HttpUriRequest.class), anyInt(), any()))
+ .thenThrow(exception);
+ }
+ HttpEntity httpEntityMock = mock(HttpEntity.class);
+ StatusLine statusLineMock = mock(StatusLine.class);
+ when(httpResponseMock.getStatusLine()).thenReturn(statusLineMock);
+ when(statusLineMock.getStatusCode()).thenReturn(responseCode);
+ when(httpResponseMock.getEntity()).thenReturn(httpEntityMock);
+ InputStream stream = new ByteArrayInputStream(content.getBytes());
+ when(httpEntityMock.getContent()).thenReturn(stream);
+ }
+}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
index 8c4b3891..d781cea3 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
@@ -19,6 +19,7 @@ package org.onap.dcaegen2.collectors.datafile.tasks;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.notNull;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -26,9 +27,12 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import java.nio.file.Paths;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
+
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -43,6 +47,7 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@@ -56,6 +61,7 @@ public class ScheduledTasksTest {
private int uniqueValue = 0;
private DMaaPMessageConsumerTask consumerMock;
+ private PublishedChecker publishedCheckerMock;
private FileCollector fileCollectorMock;
private DataRouterPublisher dataRouterMock;
@@ -75,13 +81,15 @@ public class ScheduledTasksTest {
.keyStorePasswordPath("keyStorePasswordPath") //
.enableDmaapCertAuth(true) //
.build(); //
+ doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
consumerMock = mock(DMaaPMessageConsumerTask.class);
+ publishedCheckerMock = mock(PublishedChecker.class);
fileCollectorMock = mock(FileCollector.class);
dataRouterMock = mock(DataRouterPublisher.class);
- doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
doReturn(consumerMock).when(testedObject).createConsumerTask();
+ doReturn(publishedCheckerMock).when(testedObject).createPublishedChecker();
doReturn(fileCollectorMock).when(testedObject).createFileCollector();
doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
}
@@ -146,7 +154,7 @@ public class ScheduledTasksTest {
.timeZoneOffset("") //
.name("") //
.location("") //
- .internalLocation("internalLocation") //
+ .internalLocation(Paths.get("internalLocation")) //
.compression("") //
.fileFormatType("") //
.fileFormatVersion("") //
@@ -174,6 +182,8 @@ public class ScheduledTasksTest {
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
doReturn(fileReadyMessages).when(consumerMock).execute();
+ doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
+
Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
@@ -197,6 +207,8 @@ public class ScheduledTasksTest {
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
doReturn(fileReadyMessages).when(consumerMock).execute();
+ doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
+
Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
Mono<Object> error = Mono.error(new Exception("problem"));
@@ -228,6 +240,8 @@ public class ScheduledTasksTest {
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
doReturn(fileReadyMessages).when(consumerMock).execute();
+ doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
+
Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
@@ -260,6 +274,8 @@ public class ScheduledTasksTest {
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
doReturn(fileReadyMessages).when(consumerMock).execute();
+ doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
+
Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());