aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.gitignore4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java6
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java40
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java58
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java19
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java126
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java40
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java119
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java38
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java (renamed from datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java)0
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java87
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java519
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java208
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java226
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java22
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java175
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java20
-rw-r--r--datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java2
-rw-r--r--datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java35
-rw-r--r--datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java5
-rw-r--r--datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java87
-rw-r--r--datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java33
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java4
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java2
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java67
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/IHttpAsyncClientBuilder.java43
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java37
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java28
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java164
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java215
30 files changed, 1512 insertions, 917 deletions
diff --git a/.gitignore b/.gitignore
index d03f9a95..37707c82 100644
--- a/.gitignore
+++ b/.gitignore
@@ -45,7 +45,7 @@ buildNumber.properties
.mvn/timing.properties
.mvn/wrapper/maven-wrapper.jar
-# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
-hs_err_pid*
+# CheckStyle files
+.checkstyle
opt/ \ No newline at end of file
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index 82c390f7..f89a1012 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -42,6 +42,8 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.stereotype.Component;
/**
+ * Holds all configuration for the DFC.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
@@ -79,8 +81,10 @@ public class AppConfig {
return ftpesConfiguration;
}
+ /**
+ * Reads the configuration from file.
+ */
public void loadConfigurationFromFile() {
-
GsonBuilder gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
JsonParser parser = new JsonParser();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
index b4dc6353..52723330 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -18,6 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.INVOCATION_ID;
import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
+
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
@@ -25,7 +26,9 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
+
import javax.annotation.PostConstruct;
+
import org.apache.commons.lang3.StringUtils;
import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
@@ -40,11 +43,15 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
+
import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
/**
+ * Api for starting and stopping DFC.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/13/18
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
@Configuration
@EnableScheduling
@@ -63,11 +70,18 @@ public class SchedulerConfig {
private final ScheduledTasks scheduledTask;
private final CloudConfiguration cloudConfiguration;
+ /**
+ * Constructor.
+ *
+ * @param taskScheduler The scheduler used to schedule the tasks.
+ * @param scheduledTasks The scheduler that will actually handle the tasks.
+ * @param cloudConfiguration The DFC configuration.
+ */
@Autowired
- public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTask,
- CloudConfiguration cloudConfiguration) {
+ public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTasks,
+ CloudConfiguration cloudConfiguration) {
this.taskScheduler = taskScheduler;
- this.scheduledTask = scheduledTask;
+ this.scheduledTask = scheduledTasks;
this.cloudConfiguration = cloudConfiguration;
}
@@ -84,7 +98,7 @@ public class SchedulerConfig {
logger.info(EXIT, "Stopped Datafile workflow");
MDC.clear();
return Mono.defer(() -> Mono
- .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED)));
+ .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED)));
}
/**
@@ -106,12 +120,14 @@ public class SchedulerConfig {
contextMap = MDC.getCopyOfContextMap();
logger.info(ENTRY, "Start scheduling Datafile workflow");
if (scheduledFutureList.isEmpty()) {
- scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(() -> cloudConfiguration.runTask(contextMap), Instant.now(),
- SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY));
- scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.scheduleMainDatafileEventTask(contextMap),
- SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
- scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()),
- SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE));
+ scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(() -> cloudConfiguration.runTask(contextMap),
+ Instant.now(), SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY));
+ scheduledFutureList.add(
+ taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.scheduleMainDatafileEventTask(contextMap),
+ SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
+ scheduledFutureList
+ .add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()),
+ SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE));
return true;
} else {
@@ -119,4 +135,4 @@ public class SchedulerConfig {
}
}
-} \ No newline at end of file
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index a8f79ea1..af45cc99 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -88,11 +88,17 @@ public class JsonMessageParser {
}
}
+ /**
+ * Parses the Json message and returns a stream of messages.
+ *
+ * @param rawMessage the Json message to parse.
+ * @return a <code>Flux</code> containing messages.
+ */
public Flux<FileReadyMessage> getMessagesFromJson(Mono<String> rawMessage) {
return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createMessageData);
}
- public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
+ Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
JsonParser jsonParser = new JsonParser();
return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
: element.isJsonObject() ? Optional.of((JsonObject) element)
@@ -136,13 +142,11 @@ public class JsonMessageParser {
List<FileData> allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap);
if (!allFileDataFromJson.isEmpty()) {
MessageMetaData messageMetaData = optionalMessageMetaData.get();
- // @formatter:off
- return Mono.just(ImmutableFileReadyMessage.builder()
- .pnfName(messageMetaData.sourceName())
- .messageMetaData(messageMetaData)
- .files(allFileDataFromJson)
+ return Mono.just(ImmutableFileReadyMessage.builder() //
+ .pnfName(messageMetaData.sourceName()) //
+ .messageMetaData(messageMetaData) //
+ .files(allFileDataFromJson) //
.build());
- // @formatter:on
} else {
return Mono.empty();
}
@@ -168,18 +172,16 @@ public class JsonMessageParser {
// version.
getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION, missingValues);
- // @formatter:off
- MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
- .productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues))
- .vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues))
- .lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues))
- .sourceName(getValueFromJson(commonEventHeader, SOURCE_NAME, missingValues))
- .startEpochMicrosec(getValueFromJson(commonEventHeader, START_EPOCH_MICROSEC, missingValues))
- .timeZoneOffset(getValueFromJson(commonEventHeader, TIME_ZONE_OFFSET, missingValues))
- .changeIdentifier(changeIdentifier)
- .changeType(changeType)
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
+ .productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues)) //
+ .vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues)) //
+ .lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues)) //
+ .sourceName(getValueFromJson(commonEventHeader, SOURCE_NAME, missingValues)) //
+ .startEpochMicrosec(getValueFromJson(commonEventHeader, START_EPOCH_MICROSEC, missingValues)) //
+ .timeZoneOffset(getValueFromJson(commonEventHeader, TIME_ZONE_OFFSET, missingValues)) //
+ .changeIdentifier(changeIdentifier) //
+ .changeType(changeType) //
.build();
- // @formatter:on
if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) {
return Optional.of(messageMetaData);
} else {
@@ -231,16 +233,14 @@ public class JsonMessageParser {
logger.error("Unable to collect file from xNF.", e);
return Optional.empty();
}
- // @formatter:off
- FileData fileData = ImmutableFileData.builder()
- .name(getValueFromJson(fileInfo, NAME, missingValues))
- .fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues))
- .fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues))
- .location(location)
- .scheme(scheme)
- .compression(getValueFromJson(data, COMPRESSION, missingValues))
+ FileData fileData = ImmutableFileData.builder() //
+ .name(getValueFromJson(fileInfo, NAME, missingValues)) //
+ .fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues)) //
+ .fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues)) //
+ .location(location) //
+ .scheme(scheme) //
+ .compression(getValueFromJson(data, COMPRESSION, missingValues)) //
.build();
- // @formatter:on
if (missingValues.isEmpty()) {
return Optional.of(fileData);
}
@@ -250,8 +250,8 @@ public class JsonMessageParser {
}
/**
- * Gets data from the event name, defined as:
- * {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example:
+ * Gets data from the event name.
+ * Defined as: {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example:
* Noti_RnNode-Ericsson_FileReady
*
* @param dataType The type of data to get, {@link DmaapConsumerJsonParser.EventNameDataType}.
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
index 2cb84112..e2dca182 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
@@ -13,6 +13,7 @@
* the License.
* ============LICENSE_END========================================================================
*/
+
package org.onap.dcaegen2.collectors.datafile.service;
import java.nio.file.Path;
@@ -29,14 +30,30 @@ import java.util.Map;
public class PublishedFileCache {
private final Map<Path, Instant> publishedFiles = Collections.synchronizedMap(new HashMap<Path, Instant>());
+ /**
+ * Adds a file to the cache.
+ *
+ * @param path the name of the file to add.
+ * @return <code>null</code> if the file is not already in the cache.
+ */
public Instant put(Path path) {
return publishedFiles.put(path, Instant.now());
}
+ /**
+ * Removes a file from the cache.
+ *
+ * @param localFileName name of the file to remove.
+ */
public void remove(Path localFileName) {
publishedFiles.remove(localFileName);
}
+ /**
+ * Removes files 24 hours older than the given instant.
+ *
+ * @param now the instant will determine which files that will be purged.
+ */
public void purge(Instant now) {
for (Iterator<Map.Entry<Path, Instant>> it = publishedFiles.entrySet().iterator(); it.hasNext();) {
Map.Entry<Path, Instant> pair = it.next();
@@ -46,7 +63,7 @@ public class PublishedFileCache {
}
}
- public int size() {
+ int size() {
return publishedFiles.size();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index 57edc364..0fef9ab4 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -1,24 +1,46 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.collectors.datafile.tasks;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.file.Path;
import java.time.Duration;
import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
@@ -26,17 +48,30 @@ import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReact
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.springframework.core.io.FileSystemResource;
+import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
+
import reactor.core.publisher.Mono;
/**
+ * Publishes a file to the DataRouter.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
public class DataRouterPublisher {
+ private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
+ private static final String CONTENT_TYPE = "application/octet-stream";
+ private static final String NAME_JSON_TAG = "name";
+ private static final String INTERNAL_LOCATION_JSON_TAG = "internalLocation";
+ private static final String PUBLISH_TOPIC = "publish";
+ private static final String DEFAULT_FEED_ID = "1";
private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class);
private final AppConfig datafileAppConfig;
+ private DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient;
public DataRouterPublisher(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
@@ -44,25 +79,70 @@ public class DataRouterPublisher {
/**
- * Publish one file
- * @param consumerDmaapModel information about the file to publish
- * @param maxNumberOfRetries the maximal number of retries if the publishing fails
- * @param firstBackoffTimeout the time to delay the first retry
+ * Publish one file.
+ *
+ * @param model information about the file to publish
+ * @param numRetries the maximal number of retries if the publishing fails
+ * @param firstBackoff the time to delay the first retry
* @return the HTTP response status as a string
*/
public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff,
Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
logger.trace("Method called with arg {}", model);
- DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient();
+ dmaapProducerReactiveHttpClient = resolveClient();
- //@formatter:off
return Mono.just(model)
.cache()
- .flatMap(m -> dmaapProducerReactiveHttpClient.getDmaapProducerResponse(m, contextMap))
- .flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap))
+ .flatMap(m -> publishFile(m, contextMap)) //
+ .flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap)) //
.retryBackoff(numRetries, firstBackoff);
- //@formatter:on
+ }
+
+ private Mono<HttpStatus> publishFile(ConsumerDmaapModel consumerDmaapModel, Map<String, String> contextMap) {
+ logger.trace("Entering publishFile with {}", consumerDmaapModel);
+ try {
+ HttpPut put = new HttpPut();
+ String requestId = MDC.get(REQUEST_ID);
+ put.addHeader(X_ONAP_REQUEST_ID, requestId);
+ String invocationId = UUID.randomUUID().toString();
+ put.addHeader(X_INVOCATION_ID, invocationId);
+
+ prepareHead(consumerDmaapModel, put);
+ prepareBody(consumerDmaapModel, put);
+ dmaapProducerReactiveHttpClient.addUserCredentialsToHead(put);
+
+ HttpResponse response =
+ dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, contextMap);
+ logger.trace(response.toString());
+ return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
+ } catch (Exception e) {
+ logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e);
+ return Mono.error(e);
+ }
+ }
+
+ private void prepareHead(ConsumerDmaapModel model, HttpPut put) {
+ put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE);
+ JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
+ metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
+ metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
+ put.addHeader(X_DMAAP_DR_META, metaData.toString());
+ put.setURI(getPublishUri(model.getInternalLocation().getFileName().toString()));
+ }
+
+ private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException {
+ Path fileLocation = model.getInternalLocation();
+ try (InputStream fileInputStream = createInputStream(fileLocation)) {
+ put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
+ }
+ }
+
+ private URI getPublishUri(String fileName) {
+ return dmaapProducerReactiveHttpClient.getBaseUri() //
+ .pathSegment(PUBLISH_TOPIC) //
+ .pathSegment(DEFAULT_FEED_ID) //
+ .pathSegment(fileName).build();
}
private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model,
@@ -77,6 +157,10 @@ public class DataRouterPublisher {
}
}
+ InputStream createInputStream(Path filePath) throws IOException {
+ FileSystemResource realResource = new FileSystemResource(filePath);
+ return realResource.getInputStream();
+ }
DmaapPublisherConfiguration resolveConfiguration() {
return datafileAppConfig.getDmaapPublisherConfiguration();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index af4670e3..fb27a579 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -20,6 +20,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Map;
+
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
@@ -33,6 +34,7 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import reactor.core.publisher.Mono;
/**
@@ -52,12 +54,10 @@ public class FileCollector {
MdcVariables.setMdcContextMap(contextMap);
logger.trace("Entering execute with {}", fileData);
- //@formatter:off
- return Mono.just(fileData)
- .cache()
- .flatMap(fd -> collectFile(fileData, metaData, contextMap))
- .retryBackoff(maxNumberOfRetries, firstBackoffTimeout);
- //@formatter:on
+ return Mono.just(fileData) //
+ .cache() //
+ .flatMap(fd -> collectFile(fileData, metaData, contextMap)) //
+ .retryBackoff(maxNumberOfRetries, firstBackoffTimeout);
}
private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData,
@@ -92,22 +92,20 @@ public class FileCollector {
private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, MessageMetaData metaData, Path localFile) {
String location = fileData.location();
- // @formatter:off
- return ImmutableConsumerDmaapModel.builder()
- .productName(metaData.productName())
- .vendorName(metaData.vendorName())
- .lastEpochMicrosec(metaData.lastEpochMicrosec())
- .sourceName(metaData.sourceName())
- .startEpochMicrosec(metaData.startEpochMicrosec())
- .timeZoneOffset(metaData.timeZoneOffset())
- .name(fileData.name())
- .location(location)
- .internalLocation(localFile.toString())
- .compression(fileData.compression())
- .fileFormatType(fileData.fileFormatType())
- .fileFormatVersion(fileData.fileFormatVersion())
+ return ImmutableConsumerDmaapModel.builder() //
+ .productName(metaData.productName()) //
+ .vendorName(metaData.vendorName()) //
+ .lastEpochMicrosec(metaData.lastEpochMicrosec()) //
+ .sourceName(metaData.sourceName()) //
+ .startEpochMicrosec(metaData.startEpochMicrosec()) //
+ .timeZoneOffset(metaData.timeZoneOffset()) //
+ .name(fileData.name()) //
+ .location(location) //
+ .internalLocation(localFile) //
+ .compression(fileData.compression()) //
+ .fileFormatType(fileData.fileFormatType()) //
+ .fileFormatVersion(fileData.fileFormatVersion()) //
.build();
- // @formatter:on
}
SftpClient createSftpClient(FileData fileData) throws DatafileTaskException {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
new file mode 100644
index 00000000..0729caa0
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
@@ -0,0 +1,119 @@
+/*-
+* ============LICENSE_START=======================================================
+* Copyright (C) 2019 Nordix Foundation.
+* ================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+* SPDX-License-Identifier: Apache-2.0
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpGet;
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
+import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+/**
+ * Bean used to check with DataRouter if a file has been published.
+ *
+ * @author <a href="mailto:maxime.bonneau@est.tech">Maxime Bonneau</a>
+ *
+ */
+public class PublishedChecker {
+ private static final String FEEDLOG_TOPIC = "feedlog";
+ private static final String DEFAULT_FEED_ID = "1";
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private AppConfig appConfig;
+
+ /**
+ * Constructor.
+ *
+ * @param appConfig The DFC configuration.
+ */
+ public PublishedChecker(AppConfig appConfig) {
+ this.appConfig = appConfig;
+ }
+
+ /**
+ * Checks with DataRouter if the given file has been published already.
+ *
+ * @param fileName the name of the file used when it is published.
+ *
+ * @return <code>true</code> if the file has been published before, <code>false</code> otherwise.
+ */
+ public boolean execute(String fileName, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
+ DmaapProducerReactiveHttpClient producerClient = resolveClient();
+
+ HttpGet getRequest = new HttpGet();
+ String requestId = MDC.get(REQUEST_ID);
+ getRequest.addHeader(X_ONAP_REQUEST_ID, requestId);
+ String invocationId = UUID.randomUUID().toString();
+ getRequest.addHeader(X_INVOCATION_ID, invocationId);
+ getRequest.setURI(getPublishedQueryUri(fileName, producerClient));
+ producerClient.addUserCredentialsToHead(getRequest);
+
+ try {
+ HttpResponse response =
+ producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest, 2000, contextMap);
+
+ logger.trace(response.toString());
+ int status = response.getStatusLine().getStatusCode();
+ HttpEntity entity = response.getEntity();
+ InputStream content = entity.getContent();
+ String body = IOUtils.toString(content);
+ return HttpStatus.SC_OK == status && !"[]".equals(body);
+ } catch (Exception e) {
+ logger.warn("Unable to check if file has been published.", e);
+ return false;
+ }
+ }
+
+ private URI getPublishedQueryUri(String fileName, DmaapProducerReactiveHttpClient producerClient) {
+ return producerClient.getBaseUri() //
+ .pathSegment(FEEDLOG_TOPIC) //
+ .pathSegment(DEFAULT_FEED_ID) //
+ .queryParam("type", "pub") //
+ .queryParam("filename", fileName) //
+ .build();
+ }
+
+ protected DmaapPublisherConfiguration resolveConfiguration() {
+ return appConfig.getDmaapPublisherConfiguration();
+ }
+
+ protected DmaapProducerReactiveHttpClient resolveClient() {
+ return new DmaapProducerReactiveHttpClient(resolveConfiguration());
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 28963377..d41e5c25 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -18,13 +18,13 @@ package org.onap.dcaegen2.collectors.datafile.tasks;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
@@ -37,14 +37,15 @@ import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
/**
- * This implements the main flow of the data file collector. Fetch file ready events from the
- * message router, fetch new files from the PNF publish these in the data router.
+ * This implements the main flow of the data file collector. Fetch file ready events from the message router, fetch new
+ * files from the PNF publish these in the data router.
*/
@Component
public class ScheduledTasks {
@@ -52,7 +53,7 @@ public class ScheduledTasks {
private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200;
private static final int MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS = 10;
- /** Data needed for fetching of one file */
+ /** Data needed for fetching of one file. */
private class FileCollectionData {
final FileData fileData;
final MessageMetaData metaData;
@@ -64,6 +65,7 @@ public class ScheduledTasks {
}
private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
+
private final AppConfig applicationConfiguration;
private final AtomicInteger currentNumberOfTasks = new AtomicInteger();
private final Scheduler scheduler =
@@ -96,17 +98,17 @@ public class ScheduledTasks {
.parallel(getParallelism()) // Each FileReadyMessage in a separate thread
.runOn(scheduler) //
.flatMap(this::createFileCollectionTask) //
- .filter(this::shouldBePublished) //
+ .filter(fileData -> shouldBePublished(fileData, contextMap)) //
.doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
.flatMap(fileData -> collectFileFromXnf(fileData, contextMap)) //
.flatMap(model -> publishToDataRouter(model, contextMap)) //
- .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()), contextMap)) //
+ .doOnNext(model -> deleteFile(model.getInternalLocation(), contextMap)) //
.doOnNext(model -> currentNumberOfTasks.decrementAndGet()) //
.sequential();
}
/**
- * called in regular intervals to remove out-dated cached information
+ * called in regular intervals to remove out-dated cached information.
*/
public void purgeCachedInformation(Instant now) {
alreadyPublishedFiles.purge(now);
@@ -144,8 +146,13 @@ public class ScheduledTasks {
return Flux.fromIterable(fileCollects);
}
- private boolean shouldBePublished(FileCollectionData task) {
- return alreadyPublishedFiles.put(task.fileData.getLocalFileName()) == null;
+ private boolean shouldBePublished(FileCollectionData task, Map<String, String> contextMap) {
+ boolean result = false;
+ Path localFileName = task.fileData.getLocalFileName();
+ if (alreadyPublishedFiles.put(localFileName) == null) {
+ result = !createPublishedChecker().execute(localFileName.getFileName().toString(), contextMap);
+ }
+ return result;
}
private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect,
@@ -156,7 +163,7 @@ public class ScheduledTasks {
MdcVariables.setMdcContextMap(contextMap);
return createFileCollector()
.execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout,
- contextMap)
+ contextMap)
.onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, contextMap));
}
@@ -174,10 +181,9 @@ public class ScheduledTasks {
final long maxNumberOfRetries = 3;
final Duration initialRetryTimeout = Duration.ofSeconds(5);
- DataRouterPublisher publisherTask = createDataRouterPublisher();
MdcVariables.setMdcContextMap(contextMap);
- return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout, contextMap)
+ return createDataRouterPublisher().execute(model, maxNumberOfRetries, initialRetryTimeout, contextMap)
.onErrorResume(exception -> handlePublishFailure(model, exception, contextMap));
}
@@ -185,7 +191,7 @@ public class ScheduledTasks {
Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
logger.error("File publishing failed: {}, exception: {}", model.getName(), exception);
- Path internalFileName = Paths.get(model.getInternalLocation());
+ Path internalFileName = model.getInternalLocation();
deleteFile(internalFileName, contextMap);
alreadyPublishedFiles.remove(internalFileName);
currentNumberOfTasks.decrementAndGet();
@@ -223,6 +229,10 @@ public class ScheduledTasks {
}
}
+ PublishedChecker createPublishedChecker() {
+ return new PublishedChecker(applicationConfiguration);
+ }
+
int getCurrentNumberOfTasks() {
return currentNumberOfTasks.get();
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
index 2c136304..2c136304 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
index acae1e6e..b67fac23 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
@@ -28,56 +28,49 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.Immutabl
class CloudConfigParserTest {
-
- private static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG =
- //@formatter:on
- new ImmutableDmaapConsumerConfiguration.Builder()
- .timeoutMs(-1)
- .dmaapHostName("message-router.onap.svc.cluster.local")
- .dmaapUserName("admin")
- .dmaapUserPassword("admin")
- .dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT")
- .dmaapPortNumber(2222)
- .dmaapContentType("application/json")
- .messageLimit(-1)
- .dmaapProtocol("http")
- .consumerId("C12")
- .consumerGroup("OpenDCAE-c12")
- .trustStorePath("trustStorePath")
- .trustStorePasswordPath("trustStorePasswordPath")
- .keyStorePath("keyStorePath")
- .keyStorePasswordPath("keyStorePasswordPath")
- .enableDmaapCertAuth(true)
+ private static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = //
+ new ImmutableDmaapConsumerConfiguration.Builder() //
+ .timeoutMs(-1) //
+ .dmaapHostName("message-router.onap.svc.cluster.local") //
+ .dmaapUserName("admin") //
+ .dmaapUserPassword("admin") //
+ .dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT") //
+ .dmaapPortNumber(2222) //
+ .dmaapContentType("application/json") //
+ .messageLimit(-1) //
+ .dmaapProtocol("http") //
+ .consumerId("C12") //
+ .consumerGroup("OpenDCAE-c12") //
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
.build();
- //@formatter:off
-
- private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG =
- //@formatter:on
- new ImmutableDmaapPublisherConfiguration.Builder()
- .dmaapTopicName("publish")
- .dmaapUserPassword("dradmin")
- .dmaapPortNumber(3907)
- .dmaapProtocol("https")
- .dmaapContentType("application/json")
- .dmaapHostName("message-router.onap.svc.cluster.local")
- .dmaapUserName("dradmin")
- .trustStorePath("trustStorePath")
- .trustStorePasswordPath("trustStorePasswordPath")
- .keyStorePath("keyStorePath")
- .keyStorePasswordPath("keyStorePasswordPath")
- .enableDmaapCertAuth(true)
+
+ private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = //
+ new ImmutableDmaapPublisherConfiguration.Builder() //
+ .dmaapTopicName("publish") //
+ .dmaapUserPassword("dradmin") //
+ .dmaapPortNumber(3907) //
+ .dmaapProtocol("https") //
+ .dmaapContentType("application/json") //
+ .dmaapHostName("message-router.onap.svc.cluster.local") //
+ .dmaapUserName("dradmin") //
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
.build();
- //@formatter:off
-
- private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION =
- //@formatter:on
- new ImmutableFtpesConfig.Builder()
- .keyCert("/config/ftpKey.jks")
- .keyPassword("secret")
- .trustedCA("config/cacerts")
- .trustedCAPassword("secret")
+
+ private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION = //
+ new ImmutableFtpesConfig.Builder() //
+ .keyCert("/config/ftpKey.jks") //
+ .keyPassword("secret") //
+ .trustedCA("config/cacerts") //
+ .trustedCAPassword("secret") //
.build();
- //@formatter:off
private CloudConfigParser cloudConfigParser = new CloudConfigParser(getCloudConfigJsonObject());
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
index f7b83297..b33180fa 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
@@ -68,48 +68,47 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJson_oneFileReadyMessage() throws DmaapNotFoundException {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
- .build();
-
- MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .build();
- FileData expectedFileData = ImmutableFileData.builder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .scheme(Scheme.FTPS)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
+ .build();
+
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .build();
+ FileData expectedFileData = ImmutableFileData.builder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .scheme(Scheme.FTPS) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
List<FileData> files = new ArrayList<>();
files.add(expectedFileData);
FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
- .pnfName(SOURCE_NAME)
- .messageMetaData(messageMetaData)
- .files(files)
+ .pnfName(SOURCE_NAME) //
+ .messageMetaData(messageMetaData) //
+ .files(files) //
.build();
- // @formatter:on
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -123,48 +122,47 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithTwoEvents_twoMessages() throws DmaapNotFoundException {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
- .build();
-
- MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .build();
- FileData expectedFileData = ImmutableFileData.builder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .scheme(Scheme.FTPS)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
+ .build();
+
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .build();
+ FileData expectedFileData = ImmutableFileData.builder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .scheme(Scheme.FTPS) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
List<FileData> files = new ArrayList<>();
files.add(expectedFileData);
- FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
- .pnfName(SOURCE_NAME)
- .messageMetaData(messageMetaData)
- .files(files)
+ FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() //
+ .pnfName(SOURCE_NAME) //
+ .messageMetaData(messageMetaData) //
+ .files(files) //
.build();
- // @formatter:on
+
String parsedString = message.getParsed();
String messageString = "[" + parsedString + "," + parsedString + "]";
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -178,21 +176,20 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithoutLocation_noMessage() {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
.build();
- // @formatter:on
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -206,48 +203,47 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() throws DmaapNotFoundException {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
- .build();
-
- MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .build();
- FileData expectedFileData = ImmutableFileData.builder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .scheme(Scheme.FTPS)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
+ .build();
+
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .build();
+ FileData expectedFileData = ImmutableFileData.builder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .scheme(Scheme.FTPS) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
List<FileData> files = new ArrayList<>();
files.add(expectedFileData);
- FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
- .pnfName(SOURCE_NAME)
- .messageMetaData(messageMetaData)
- .files(files)
+ FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() //
+ .pnfName(SOURCE_NAME) //
+ .messageMetaData(messageMetaData) //
+ .files(files) //
.build();
- // @formatter:on
+
String parsedString = message.getParsed();
String messageString = "[{\"event\":{}}," + parsedString + "]";
JsonMessageParser jsonMessageParserUnderTest = new JsonMessageParser();
@@ -258,21 +254,20 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithFaultyEventName_noFileData() {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName("Faulty event name")
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName("Faulty event name") //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
.build();
- // @formatter:on
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -286,21 +281,20 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithoutName_noFileData() {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
.build();
- // @formatter:on
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -314,14 +308,13 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithoutAdditionalFields_noFileData() {
- // @formatter:off
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .build();
- // @formatter:on
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .build();
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -335,21 +328,20 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithoutCompression_noFileData() {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
- .build();
- // @formatter:on
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
+ .build();
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -363,21 +355,20 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithoutFileFormatType_noFileData() {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
.build();
- // @formatter:on
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -391,55 +382,54 @@ class JsonMessageParserTest {
@Test
void whenPassingOneCorrectJsonWithoutFileFormatVersionAndOneCorrect_oneFileData() {
- // @formatter:off
- AdditionalField additionalFaultyField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .build();
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalFaultyField)
- .addAdditionalField(additionalField)
- .build();
-
- MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .build();
- FileData expectedFileData = ImmutableFileData.builder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .scheme(Scheme.FTPS)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalFaultyField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .build();
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalFaultyField) //
+ .addAdditionalField(additionalField) //
+ .build();
+
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .build();
+ FileData expectedFileData = ImmutableFileData.builder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .scheme(Scheme.FTPS) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
List<FileData> files = new ArrayList<>();
files.add(expectedFileData);
- FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
- .pnfName(SOURCE_NAME)
- .messageMetaData(messageMetaData)
- .files(files)
+ FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() //
+ .pnfName(SOURCE_NAME) //
+ .messageMetaData(messageMetaData) //
+ .files(files) //
.build();
- // @formatter:on
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -453,14 +443,13 @@ class JsonMessageParserTest {
@Test
void whenPassingJsonWithoutMandatoryHeaderInformation_noFileData() {
- // @formatter:off
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier("PM_MEAS_FILES_INVALID")
- .changeType("FileReady_INVALID")
- .notificationFieldsVersion("1.0_INVALID")
- .build();
- // @formatter:on
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier("PM_MEAS_FILES_INVALID") //
+ .changeType("FileReady_INVALID") //
+ .notificationFieldsVersion("1.0_INVALID") //
+ .build();
+
String incorrectMessageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -486,21 +475,20 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithIncorrectChangeType_noFileData() {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
.compression(GZIP_COMPRESSION)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(INCORRECT_CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(INCORRECT_CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
.build();
- // @formatter:on
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -514,21 +502,20 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithIncorrectChangeIdentifier_noFileData() {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(INCORRECT_CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(INCORRECT_CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
.build();
- // @formatter:on
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
index f88e301d..a695e20d 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
@@ -28,6 +28,8 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
@@ -74,7 +76,7 @@ public class DMaaPMessageConsumerTaskImplTest {
private static final String PORT_22 = "22";
private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME;
- private static final String LOCAL_FILE_LOCATION = "target/" + PM_FILE_NAME;
+ private static final Path LOCAL_FILE_LOCATION = Paths.get("target/" + PM_FILE_NAME);
private static final String FTPES_LOCATION = FTPES_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
private static final String GZIP_COMPRESSION = "gzip";
@@ -86,7 +88,7 @@ public class DMaaPMessageConsumerTaskImplTest {
private static AppConfig appConfig;
private static DmaapConsumerConfiguration dmaapConsumerConfiguration;
private DMaaPMessageConsumerTask messageConsumerTask;
- private DMaaPConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient;
+ private DMaaPConsumerReactiveHttpClient httpClientMock;
private static String ftpesMessageString;
private static FileData ftpesFileData;
@@ -96,156 +98,163 @@ public class DMaaPMessageConsumerTaskImplTest {
private static FileData sftpFileData;
private static FileReadyMessage expectedSftpMessage;
+ /**
+ * Sets up data for the test.
+ */
@BeforeAll
public static void setUp() {
- //@formatter:off
- dmaapConsumerConfiguration = new ImmutableDmaapConsumerConfiguration.Builder()
- .consumerGroup("OpenDCAE-c12")
- .consumerId("c12")
- .dmaapContentType("application/json")
- .dmaapHostName("54.45.33.2")
- .dmaapPortNumber(1234).dmaapProtocol("https")
- .dmaapUserName("Datafile")
- .dmaapUserPassword("Datafile")
- .dmaapTopicName("unauthenticated.NOTIFICATION")
- .timeoutMs(-1)
- .messageLimit(-1)
- .trustStorePath("trustStorePath")
- .trustStorePasswordPath("trustStorePasswordPath")
- .keyStorePath("keyStorePath")
- .keyStorePasswordPath("keyStorePasswordPath")
- .enableDmaapCertAuth(true)
+ dmaapConsumerConfiguration = new ImmutableDmaapConsumerConfiguration.Builder() //
+ .consumerGroup("OpenDCAE-c12") //
+ .consumerId("c12") //
+ .dmaapContentType("application/json") //
+ .dmaapHostName("54.45.33.2") //
+ .dmaapPortNumber(1234).dmaapProtocol("https") //
+ .dmaapUserName("Datafile") //
+ .dmaapUserPassword("Datafile") //
+ .dmaapTopicName("unauthenticated.NOTIFICATION") //
+ .timeoutMs(-1) //
+ .messageLimit(-1) //
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
.build();
appConfig = mock(AppConfig.class);
- AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder()
- .location(FTPES_LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .location(FTPES_LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage ftpesJsonMessage = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
- .changeType(FILE_READY_CHANGE_TYPE)
- .notificationFieldsVersion("1.0")
- .addAdditionalField(ftpesAdditionalField)
+ JsonMessage ftpesJsonMessage = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) //
+ .changeType(FILE_READY_CHANGE_TYPE) //
+ .notificationFieldsVersion("1.0") //
+ .addAdditionalField(ftpesAdditionalField) //
.build();
ftpesMessageString = ftpesJsonMessage.toString();
- MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
- .changeType(FILE_READY_CHANGE_TYPE)
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) //
+ .changeType(FILE_READY_CHANGE_TYPE) //
.build();
- ftpesFileData = ImmutableFileData.builder()
- .name(PM_FILE_NAME)
- .location(FTPES_LOCATION)
- .scheme(Scheme.FTPS)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ ftpesFileData = ImmutableFileData.builder() //
+ .name(PM_FILE_NAME) //
+ .location(FTPES_LOCATION) //
+ .scheme(Scheme.FTPS) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
List<FileData> files = new ArrayList<>();
files.add(ftpesFileData);
- expectedFtpesMessage = ImmutableFileReadyMessage.builder()
- .pnfName(SOURCE_NAME)
- .messageMetaData(messageMetaData)
- .files(files)
+ expectedFtpesMessage = ImmutableFileReadyMessage.builder() //
+ .pnfName(SOURCE_NAME) //
+ .messageMetaData(messageMetaData) //
+ .files(files) //
.build();
- AdditionalField sftpAdditionalField = new JsonMessage.AdditionalFieldBuilder()
- .location(SFTP_LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField sftpAdditionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .location(SFTP_LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage sftpJsonMessage = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
- .changeType(FILE_READY_CHANGE_TYPE)
- .notificationFieldsVersion("1.0")
- .addAdditionalField(sftpAdditionalField)
+ JsonMessage sftpJsonMessage = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) //
+ .changeType(FILE_READY_CHANGE_TYPE) //
+ .notificationFieldsVersion("1.0") //
+ .addAdditionalField(sftpAdditionalField) //
.build();
sftpMessageString = sftpJsonMessage.toString();
- sftpFileData = ImmutableFileData.builder()
- .name(PM_FILE_NAME)
- .location(SFTP_LOCATION)
- .scheme(Scheme.FTPS)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ sftpFileData = ImmutableFileData.builder() //
+ .name(PM_FILE_NAME) //
+ .location(SFTP_LOCATION) //
+ .scheme(Scheme.FTPS) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .name(PM_FILE_NAME)
- .location(FTPES_LOCATION)
- .internalLocation(LOCAL_FILE_LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .name(PM_FILE_NAME) //
+ .location(FTPES_LOCATION) //
+ .internalLocation(LOCAL_FILE_LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
listOfConsumerDmaapModel.add(consumerDmaapModel);
files = new ArrayList<>();
files.add(sftpFileData);
- expectedSftpMessage = ImmutableFileReadyMessage.builder()
- .pnfName(SOURCE_NAME)
- .messageMetaData(messageMetaData)
- .files(files)
+ expectedSftpMessage = ImmutableFileReadyMessage.builder() //
+ .pnfName(SOURCE_NAME) //
+ .messageMetaData(messageMetaData) //
+ .files(files) //
.build();
- //@formatter:on
}
@Test
public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() {
prepareMocksForDmaapConsumer("", null);
- StepVerifier.create(messageConsumerTask.execute()).expectSubscription()
- .expectError(DatafileTaskException.class).verify();
+ StepVerifier.create(messageConsumerTask.execute()) //
+ .expectSubscription() //
+ .expectError(DatafileTaskException.class) //
+ .verify();
- verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
+ verify(httpClientMock, times(1)).getDMaaPConsumerResponse();
}
@Test
public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException {
prepareMocksForDmaapConsumer(ftpesMessageString, expectedFtpesMessage);
- StepVerifier.create(messageConsumerTask.execute()).expectNext(expectedFtpesMessage).verifyComplete();
+ StepVerifier.create(messageConsumerTask.execute()) //
+ .expectNext(expectedFtpesMessage) //
+ .verifyComplete();
- verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
- verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
+ verify(httpClientMock, times(1)).getDMaaPConsumerResponse();
+ verifyNoMoreInteractions(httpClientMock);
}
@Test
public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException {
prepareMocksForDmaapConsumer(sftpMessageString, expectedSftpMessage);
- StepVerifier.create(messageConsumerTask.execute()).expectNext(expectedSftpMessage).verifyComplete();
+ StepVerifier.create(messageConsumerTask.execute()) //
+ .expectNext(expectedSftpMessage) //
+ .verifyComplete();
- verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
- verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
+ verify(httpClientMock, times(1)).getDMaaPConsumerResponse();
+ verifyNoMoreInteractions(httpClientMock);
}
private void prepareMocksForDmaapConsumer(String message, FileReadyMessage fileReadyMessageAfterConsume) {
Mono<String> messageAsMono = Mono.just(message);
JsonMessageParser jsonMessageParserMock = mock(JsonMessageParser.class);
- dmaapConsumerReactiveHttpClient = mock(DMaaPConsumerReactiveHttpClient.class);
- when(dmaapConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(messageAsMono);
+ httpClientMock = mock(DMaaPConsumerReactiveHttpClient.class);
+ when(httpClientMock.getDMaaPConsumerResponse()).thenReturn(messageAsMono);
if (!message.isEmpty()) {
when(jsonMessageParserMock.getMessagesFromJson(messageAsMono))
@@ -255,9 +264,8 @@ public class DMaaPMessageConsumerTaskImplTest {
.thenReturn(Flux.error(new DatafileTaskException("problemas")));
}
- messageConsumerTask =
- spy(new DMaaPMessageConsumerTask(appConfig, dmaapConsumerReactiveHttpClient, jsonMessageParserMock));
+ messageConsumerTask = spy(new DMaaPMessageConsumerTask(appConfig, httpClientMock, jsonMessageParserMock));
when(messageConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration);
- doReturn(dmaapConsumerReactiveHttpClient).when(messageConsumerTask).resolveClient();
+ doReturn(httpClientMock).when(messageConsumerTask).resolveClient();
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
index d612d17c..ed8b93f1 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -16,8 +16,9 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -25,19 +26,36 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+
+import org.apache.http.Header;
+import org.apache.http.HttpResponse;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
import org.springframework.http.HttpStatus;
-import reactor.core.publisher.Mono;
+import org.springframework.web.util.DefaultUriBuilderFactory;
+import org.springframework.web.util.UriBuilder;
+
import reactor.test.StepVerifier;
/**
@@ -52,31 +70,41 @@ class DataRouterPublisherTest {
private static final String START_EPOCH_MICROSEC = "8745745764578";
private static final String TIME_ZONE_OFFSET = "UTC+05:00";
private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
+ private static final String FTPES_ADDRESS = "ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME;
+ private static final String LOCAL_FILE_NAME = SOURCE_NAME + "_" + PM_FILE_NAME;
+
+ private static final String COMPRESSION = "gzip";
+ private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
+ private static final String FILE_FORMAT_VERSION = "V10";
+ private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
+
+ private static final String HOST = "54.45.33.2";
+ private static final String HTTPS_SCHEME = "https";
+ private static final int PORT = 1234;
+ private static final String APPLICATION_OCTET_STREAM_CONTENT_TYPE = "application/octet-stream";
+ private static final String PUBLISH_TOPIC = "publish";
+ private static final String FEED_ID = "1";
+ private static final String FILE_CONTENT = "Just a string.";
+
+ private static final Map<String, String> CONTEXT_MAP = new HashMap<>();
private static ConsumerDmaapModel consumerDmaapModel;
- private static DataRouterPublisher dmaapPublisherTask;
- private static DmaapProducerReactiveHttpClient dMaaPProducerReactiveHttpClient;
+ private static DmaapProducerReactiveHttpClient httpClientMock;
private static AppConfig appConfig;
- private static DmaapPublisherConfiguration dmaapPublisherConfiguration;
+ private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
+ private static DataRouterPublisher publisherTaskUnderTestSpy;
+
+ /**
+ * Sets up data for tests.
+ */
@BeforeAll
public static void setUp() {
+ when(publisherConfigurationMock.dmaapHostName()).thenReturn(HOST);
+ when(publisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME);
+ when(publisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT);
- dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder()
- .dmaapContentType("application/json") //
- .dmaapHostName("54.45.33.2") //
- .dmaapPortNumber(1234) //
- .dmaapProtocol("https") //
- .dmaapUserName("DFC") //
- .dmaapUserPassword("DFC") //
- .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") //
- .trustStorePath("trustStorePath") //
- .trustStorePasswordPath("trustStorePasswordPath") //
- .keyStorePath("keyStorePath") //
- .keyStorePasswordPath("keyStorePasswordPath") //
- .enableDmaapCertAuth(true) //
- .build(); //
- consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
+ consumerDmaapModel = ImmutableConsumerDmaapModel.builder() //
.productName(PRODUCT_NAME) //
.vendorName(VENDOR_NAME) //
.lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
@@ -84,61 +112,145 @@ class DataRouterPublisherTest {
.startEpochMicrosec(START_EPOCH_MICROSEC) //
.timeZoneOffset(TIME_ZONE_OFFSET) //
.name(PM_FILE_NAME) //
- .location("ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME) //
- .internalLocation("target/" + PM_FILE_NAME) //
+ .location(FTPES_ADDRESS) //
+ .internalLocation(Paths.get("target/" + LOCAL_FILE_NAME)) //
.compression("gzip") //
- .fileFormatType("org.3GPP.32.435#measCollec") //
- .fileFormatVersion("V10") //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build(); //
appConfig = mock(AppConfig.class);
-
- doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
+ publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig));
}
@Test
- public void whenPassedObjectFits_ReturnsCorrectStatus() {
- prepareMocksForTests(Mono.just(HttpStatus.OK));
+ public void whenPassedObjectFits_ReturnsCorrectStatus() throws Exception {
+ prepareMocksForTests(null, Integer.valueOf(HttpStatus.OK.value()));
+
+ StepVerifier
+ .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), CONTEXT_MAP))
+ .expectNext(consumerDmaapModel) //
+ .verifyComplete();
+
+ ArgumentCaptor<HttpUriRequest> requestCaptor = ArgumentCaptor.forClass(HttpUriRequest.class);
+ verify(httpClientMock).getBaseUri();
+ verify(httpClientMock).addUserCredentialsToHead(any(HttpUriRequest.class));
+ verify(httpClientMock).getDmaapProducerResponseWithRedirect(requestCaptor.capture(), any());
+ verifyNoMoreInteractions(httpClientMock);
- Map<String, String> contextMap = new HashMap<>();
- StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
- .expectNext(consumerDmaapModel).verifyComplete();
+ HttpPut actualPut = (HttpPut) requestCaptor.getValue();
+ URI actualUri = actualPut.getURI();
+ assertEquals(HTTPS_SCHEME, actualUri.getScheme());
+ assertEquals(HOST, actualUri.getHost());
+ assertEquals(PORT, actualUri.getPort());
+ Path actualPath = Paths.get(actualUri.getPath());
+ assertTrue(PUBLISH_TOPIC.equals(actualPath.getName(0).toString()));
+ assertTrue(FEED_ID.equals(actualPath.getName(1).toString()));
+ assertTrue(LOCAL_FILE_NAME.equals(actualPath.getName(2).toString()));
- verify(dMaaPProducerReactiveHttpClient, times(1)).getDmaapProducerResponse(any(), eq(contextMap));
- verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
+ Header[] contentHeaders = actualPut.getHeaders("content-type");
+ assertEquals(APPLICATION_OCTET_STREAM_CONTENT_TYPE, contentHeaders[0].getValue());
+
+ Header[] metaHeaders = actualPut.getHeaders(X_DMAAP_DR_META);
+ Map<String, String> metaHash = getMetaDataAsMap(metaHeaders);
+ assertTrue(10 == metaHash.size());
+ assertEquals(PRODUCT_NAME, metaHash.get("productName"));
+ assertEquals(VENDOR_NAME, metaHash.get("vendorName"));
+ assertEquals(LAST_EPOCH_MICROSEC, metaHash.get("lastEpochMicrosec"));
+ assertEquals(SOURCE_NAME, metaHash.get("sourceName"));
+ assertEquals(START_EPOCH_MICROSEC, metaHash.get("startEpochMicrosec"));
+ assertEquals(TIME_ZONE_OFFSET, metaHash.get("timeZoneOffset"));
+ assertEquals(COMPRESSION, metaHash.get("compression"));
+ assertEquals(FTPES_ADDRESS, metaHash.get("location"));
+ assertEquals(FILE_FORMAT_TYPE, metaHash.get("fileFormatType"));
+ assertEquals(FILE_FORMAT_VERSION, metaHash.get("fileFormatVersion"));
+ }
+
+ @Test
+ void whenPassedObjectFits_firstFailsWithExceptionThenSucceeds() throws Exception {
+ prepareMocksForTests(new DatafileTaskException("Error"), HttpStatus.OK.value());
+
+ StepVerifier
+ .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 2, Duration.ofSeconds(0), CONTEXT_MAP))
+ .expectNext(consumerDmaapModel) //
+ .verifyComplete();
}
@Test
- public void whenPassedObjectFits_firstFailsThenSucceeds() {
- prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.OK));
+ public void whenPassedObjectFits_firstFailsThenSucceeds() throws Exception {
+ prepareMocksForTests(null, Integer.valueOf(HttpStatus.BAD_GATEWAY.value()),
+ Integer.valueOf(HttpStatus.OK.value()));
- Map<String, String> contextMap = new HashMap<>();
- StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
- .expectNext(consumerDmaapModel).verifyComplete();
+ StepVerifier
+ .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), CONTEXT_MAP))
+ .expectNext(consumerDmaapModel) //
+ .verifyComplete();
- verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any(), eq(contextMap));
- verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
+ verify(httpClientMock, times(2)).getBaseUri();
+ verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class));
+ verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any());
+ verifyNoMoreInteractions(httpClientMock);
}
@Test
- public void whenPassedObjectFits_firstFailsThenFails() {
- prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.BAD_GATEWAY));
+ public void whenPassedObjectFits_firstFailsThenFails() throws Exception {
+ prepareMocksForTests(null, Integer.valueOf(HttpStatus.BAD_GATEWAY.value()),
+ Integer.valueOf((HttpStatus.BAD_GATEWAY.value())));
- Map<String, String> contextMap = new HashMap<>();
- StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
- .expectErrorMessage("Retries exhausted: 1/1").verify();
+ StepVerifier
+ .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), CONTEXT_MAP))
+ .expectErrorMessage("Retries exhausted: 1/1") //
+ .verify();
- verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any(), eq(contextMap));
- verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
+ verify(httpClientMock, times(2)).getBaseUri();
+ verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class));
+ verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any());
+ verifyNoMoreInteractions(httpClientMock);
}
@SafeVarargs
- final void prepareMocksForTests(Mono<HttpStatus> firstResponse, Mono<HttpStatus>... nextHttpResponses) {
- dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class);
- when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any(), any())).thenReturn(firstResponse,
- nextHttpResponses);
-
- dmaapPublisherTask = spy(new DataRouterPublisher(appConfig));
- when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration);
- doReturn(dMaaPProducerReactiveHttpClient).when(dmaapPublisherTask).resolveClient();
+ final void prepareMocksForTests(Exception exception, Integer firstResponse, Integer... nextHttpResponses)
+ throws Exception {
+ httpClientMock = mock(DmaapProducerReactiveHttpClient.class);
+ when(appConfig.getDmaapPublisherConfiguration()).thenReturn(publisherConfigurationMock);
+ doReturn(publisherConfigurationMock).when(publisherTaskUnderTestSpy).resolveConfiguration();
+ doReturn(httpClientMock).when(publisherTaskUnderTestSpy).resolveClient();
+
+ UriBuilder uriBuilder = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT);
+ when(httpClientMock.getBaseUri()).thenReturn(uriBuilder);
+
+ HttpResponse httpResponseMock = mock(HttpResponse.class);
+ if (exception == null) {
+ when(httpClientMock.getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any()))
+ .thenReturn(httpResponseMock);
+ } else {
+ when(httpClientMock.getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any()))
+ .thenThrow(exception).thenReturn(httpResponseMock);
+ }
+ StatusLine statusLineMock = mock(StatusLine.class);
+ when(httpResponseMock.getStatusLine()).thenReturn(statusLineMock);
+ when(statusLineMock.getStatusCode()).thenReturn(firstResponse, nextHttpResponses);
+
+ InputStream fileStream = new ByteArrayInputStream(FILE_CONTENT.getBytes());
+ doReturn(fileStream).when(publisherTaskUnderTestSpy).createInputStream(Paths.get("target", LOCAL_FILE_NAME));
+ }
+
+ private Map<String, String> getMetaDataAsMap(Header[] metaHeaders) {
+ Map<String, String> metaHash = new HashMap<>();
+ String actualMetaData = metaHeaders[0].getValue();
+ actualMetaData = actualMetaData.substring(1, actualMetaData.length() - 1);
+ actualMetaData = actualMetaData.replace("\"", "");
+ String[] commaSplitedMetaData = actualMetaData.split(",");
+ for (int i = 0; i < commaSplitedMetaData.length; i++) {
+ String[] keyValuePair = commaSplitedMetaData[i].split(":");
+ if (keyValuePair.length > 2) {
+ List<String> arrayKeyValuePair = new ArrayList<>(keyValuePair.length);
+ for (int j = 1; j < keyValuePair.length; j++) {
+ arrayKeyValuePair.add(keyValuePair[j]);
+ }
+ keyValuePair[1] = String.join(":", arrayKeyValuePair);
+ }
+ metaHash.put(keyValuePair[0], keyValuePair[1]);
+ }
+ return metaHash;
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
index c266d50e..fb49c860 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -134,7 +134,7 @@ public class FileCollectorTest {
.timeZoneOffset(TIME_ZONE_OFFSET)
.name(PM_FILE_NAME)
.location(location)
- .internalLocation(LOCAL_FILE_LOCATION.toString())
+ .internalLocation(LOCAL_FILE_LOCATION)
.compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
@@ -161,7 +161,8 @@ public class FileCollectorTest {
ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT);
Map<String, String> contextMap = new HashMap<>();
- StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+ StepVerifier.create(
+ collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
.expectNext(expectedConsumerDmaapModel).verifyComplete();
verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
@@ -179,7 +180,9 @@ public class FileCollectorTest {
ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(SFTP_LOCATION_NO_PORT);
Map<String, String> contextMap = new HashMap<>();
- StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+ StepVerifier
+ .create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0),
+ contextMap))
.expectNext(expectedConsumerDmaapModel) //
.verifyComplete();
@@ -187,7 +190,9 @@ public class FileCollectorTest {
fileData = createFileData(SFTP_LOCATION, Scheme.SFTP);
expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(SFTP_LOCATION);
- StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+ StepVerifier
+ .create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0),
+ contextMap))
.expectNext(expectedConsumerDmaapModel) //
.verifyComplete();
@@ -206,7 +211,8 @@ public class FileCollectorTest {
.collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
Map<String, String> contextMap = new HashMap<>();
- StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+ StepVerifier.create(
+ collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
.expectErrorMessage("Retries exhausted: 3/3").verify();
verify(ftpsClientMock, times(4)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
@@ -223,10 +229,10 @@ public class FileCollectorTest {
FileData fileData = createFileData(FTPES_LOCATION_NO_PORT, Scheme.FTPS);
Map<String, String> contextMap = new HashMap<>();
- StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+ StepVerifier.create(
+ collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
.expectNext(expectedConsumerDmaapModel).verifyComplete();
verify(ftpsClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
}
-
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java
new file mode 100644
index 00000000..3e3c2ed6
--- /dev/null
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java
@@ -0,0 +1,175 @@
+/*-
+* ============LICENSE_START=======================================================
+* Copyright (C) 2019 Nordix Foundation.
+* ================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+* SPDX-License-Identifier: Apache-2.0
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
+import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.springframework.web.util.DefaultUriBuilderFactory;
+import org.springframework.web.util.UriBuilder;
+
+/**
+ * @author <a href="mailto:maxime.bonneau@est.tech">Maxime Bonneau</a>
+ *
+ */
+public class PublishedCheckerTest {
+ private static final String EMPTY_CONTENT = "[]";
+ private static final String FEEDLOG_TOPIC = "feedlog";
+ private static final String FEED_ID = "1";
+ private static final String HTTPS_SCHEME = "https";
+ private static final String HOST = "54.45.33.2";
+ private static final int PORT = 1234;
+ private static final String SOURCE_NAME = "oteNB5309";
+ private static final String FILE_NAME = "A20161224.1030-1045.bin.gz";
+ private static final String LOCAL_FILE_NAME = SOURCE_NAME + "_" + FILE_NAME;
+
+ private static final Map<String, String> CONTEXT_MAP = new HashMap<>();
+
+ private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
+ private static AppConfig appConfigMock;
+ private DmaapProducerReactiveHttpClient httpClientMock = mock(DmaapProducerReactiveHttpClient.class);
+
+ private PublishedChecker publishedCheckerUnderTestSpy;
+
+ /**
+ * Sets up data for the tests.
+ */
+ @BeforeAll
+ public static void setUp() {
+ when(publisherConfigurationMock.dmaapHostName()).thenReturn(HOST);
+ when(publisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME);
+ when(publisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT);
+
+ appConfigMock = mock(AppConfig.class);
+ when(appConfigMock.getDmaapPublisherConfiguration()).thenReturn(publisherConfigurationMock);
+ }
+
+ @Test
+ public void executeWhenNotPublished_returnsFalse() throws Exception {
+ prepareMocksForTests(HttpUtils.SC_OK, EMPTY_CONTENT, null);
+
+ boolean isPublished = publishedCheckerUnderTestSpy.execute(LOCAL_FILE_NAME, CONTEXT_MAP);
+
+ assertFalse(isPublished);
+
+ ArgumentCaptor<HttpUriRequest> requestCaptor = ArgumentCaptor.forClass(HttpUriRequest.class);
+ verify(httpClientMock).getBaseUri();
+ verify(httpClientMock).addUserCredentialsToHead(any(HttpUriRequest.class));
+ verify(httpClientMock).getDmaapProducerResponseWithCustomTimeout(requestCaptor.capture(), anyInt(), any());
+ verifyNoMoreInteractions(httpClientMock);
+
+ HttpUriRequest getRequest = requestCaptor.getValue();
+ assertTrue(getRequest instanceof HttpGet);
+ URI actualUri = getRequest.getURI();
+ assertEquals(HTTPS_SCHEME, actualUri.getScheme());
+ assertEquals(HOST, actualUri.getHost());
+ assertEquals(PORT, actualUri.getPort());
+ Path actualPath = Paths.get(actualUri.getPath());
+ assertTrue(FEEDLOG_TOPIC.equals(actualPath.getName(0).toString()));
+ assertTrue(FEED_ID.equals(actualPath.getName(1).toString()));
+ String actualQuery = actualUri.getQuery();
+ assertTrue(actualQuery.contains("type=pub"));
+ assertTrue(actualQuery.contains("filename=" + LOCAL_FILE_NAME));
+ }
+
+ @Test
+ public void executeWhenDataRouterReturnsNok_returnsFalse() throws Exception {
+ prepareMocksForTests(HttpUtils.SC_BAD_REQUEST, EMPTY_CONTENT, null);
+
+ boolean isPublished = publishedCheckerUnderTestSpy.execute(LOCAL_FILE_NAME, CONTEXT_MAP);
+
+ assertFalse(isPublished);
+ }
+
+ @Test
+ public void executeWhenPublished_returnsTrue() throws Exception {
+ prepareMocksForTests(HttpUtils.SC_OK, "[" + LOCAL_FILE_NAME + "]", null);
+
+ boolean isPublished = publishedCheckerUnderTestSpy.execute(LOCAL_FILE_NAME, CONTEXT_MAP);
+
+ assertTrue(isPublished);
+ }
+
+ @Test
+ public void executeWhenErrorInDataRouter_returnsFalse() throws Exception {
+ prepareMocksForTests(HttpUtils.SC_OK, EMPTY_CONTENT, new DatafileTaskException(""));
+
+ boolean isPublished = publishedCheckerUnderTestSpy.execute(LOCAL_FILE_NAME, CONTEXT_MAP);
+
+ assertFalse(isPublished);
+ }
+
+ final void prepareMocksForTests(int responseCode, String content, Exception exception) throws Exception {
+ publishedCheckerUnderTestSpy = spy(new PublishedChecker(appConfigMock));
+
+ doReturn(publisherConfigurationMock).when(publishedCheckerUnderTestSpy).resolveConfiguration();
+ doReturn(httpClientMock).when(publishedCheckerUnderTestSpy).resolveClient();
+
+ UriBuilder uriBuilder = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT);
+ when(httpClientMock.getBaseUri()).thenReturn(uriBuilder);
+
+ HttpResponse httpResponseMock = mock(HttpResponse.class);
+ if (exception == null) {
+ when(httpClientMock.getDmaapProducerResponseWithCustomTimeout(any(HttpUriRequest.class), anyInt(), any()))
+ .thenReturn(httpResponseMock);
+ } else {
+ when(httpClientMock.getDmaapProducerResponseWithCustomTimeout(any(HttpUriRequest.class), anyInt(), any()))
+ .thenThrow(exception);
+ }
+ HttpEntity httpEntityMock = mock(HttpEntity.class);
+ StatusLine statusLineMock = mock(StatusLine.class);
+ when(httpResponseMock.getStatusLine()).thenReturn(statusLineMock);
+ when(statusLineMock.getStatusCode()).thenReturn(responseCode);
+ when(httpResponseMock.getEntity()).thenReturn(httpEntityMock);
+ InputStream stream = new ByteArrayInputStream(content.getBytes());
+ when(httpEntityMock.getContent()).thenReturn(stream);
+ }
+}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
index 8c4b3891..d781cea3 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
@@ -19,6 +19,7 @@ package org.onap.dcaegen2.collectors.datafile.tasks;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.notNull;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -26,9 +27,12 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import java.nio.file.Paths;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
+
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -43,6 +47,7 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@@ -56,6 +61,7 @@ public class ScheduledTasksTest {
private int uniqueValue = 0;
private DMaaPMessageConsumerTask consumerMock;
+ private PublishedChecker publishedCheckerMock;
private FileCollector fileCollectorMock;
private DataRouterPublisher dataRouterMock;
@@ -75,13 +81,15 @@ public class ScheduledTasksTest {
.keyStorePasswordPath("keyStorePasswordPath") //
.enableDmaapCertAuth(true) //
.build(); //
+ doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
consumerMock = mock(DMaaPMessageConsumerTask.class);
+ publishedCheckerMock = mock(PublishedChecker.class);
fileCollectorMock = mock(FileCollector.class);
dataRouterMock = mock(DataRouterPublisher.class);
- doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
doReturn(consumerMock).when(testedObject).createConsumerTask();
+ doReturn(publishedCheckerMock).when(testedObject).createPublishedChecker();
doReturn(fileCollectorMock).when(testedObject).createFileCollector();
doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
}
@@ -146,7 +154,7 @@ public class ScheduledTasksTest {
.timeZoneOffset("") //
.name("") //
.location("") //
- .internalLocation("internalLocation") //
+ .internalLocation(Paths.get("internalLocation")) //
.compression("") //
.fileFormatType("") //
.fileFormatVersion("") //
@@ -174,6 +182,8 @@ public class ScheduledTasksTest {
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
doReturn(fileReadyMessages).when(consumerMock).execute();
+ doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
+
Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
@@ -197,6 +207,8 @@ public class ScheduledTasksTest {
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
doReturn(fileReadyMessages).when(consumerMock).execute();
+ doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
+
Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
Mono<Object> error = Mono.error(new Exception("problem"));
@@ -228,6 +240,8 @@ public class ScheduledTasksTest {
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
doReturn(fileReadyMessages).when(consumerMock).execute();
+ doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
+
Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
@@ -260,6 +274,8 @@ public class ScheduledTasksTest {
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
doReturn(fileReadyMessages).when(consumerMock).execute();
+ doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
+
Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
index 0eaa7a17..5e08efc7 100644
--- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
@@ -30,6 +30,6 @@ public class DatafileTaskException extends Exception {
}
public DatafileTaskException(String message, Exception e) {
- super(message + e);
+ super(message, e);
}
}
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
index 9f3a3188..f115dba7 100644
--- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
@@ -20,15 +20,44 @@ package org.onap.dcaegen2.collectors.datafile.model;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+import java.lang.reflect.Type;
+import java.nio.file.Path;
+/**
+ * Helper class to serialize object.
+ */
public class CommonFunctions {
- private static Gson gson = new GsonBuilder().serializeNulls().create();
+ private static Gson gson =
+ new GsonBuilder().registerTypeHierarchyAdapter(Path.class, new PathConverter()).serializeNulls().create();
- private CommonFunctions() {}
+ private CommonFunctions() {
+ }
+ /**
+ * Serializes a <code>ConsumerDmaapModel</code>.
+ *
+ * @param consumerDmaapModel model to serialize.
+ *
+ * @return a string with the serialized model.
+ */
public static String createJsonBody(ConsumerDmaapModel consumerDmaapModel) {
return gson.toJson(consumerDmaapModel);
}
-} \ No newline at end of file
+
+ /**
+ * Json serializer that handles Path serializations, since <code>Path</code> does not implement the
+ * <code>Serializable</code> interface.
+ */
+ public static class PathConverter implements JsonSerializer<Path> {
+ @Override
+ public JsonElement serialize(Path path, Type type, JsonSerializationContext jsonSerializationContext) {
+ return new JsonPrimitive(path.toString());
+ }
+ }
+}
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java
index 972316bf..2337485a 100644
--- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java
@@ -17,6 +17,9 @@
package org.onap.dcaegen2.collectors.datafile.model;
import com.google.gson.annotations.SerializedName;
+
+import java.nio.file.Path;
+
import org.immutables.gson.Gson;
import org.immutables.value.Value;
import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel;
@@ -55,7 +58,7 @@ public interface ConsumerDmaapModel extends DmaapModel {
String getLocation();
@SerializedName("internalLocation")
- String getInternalLocation();
+ Path getInternalLocation();
@SerializedName("compression")
String getCompression();
diff --git a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
index cbc3e122..25f0dbfc 100644
--- a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
+++ b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
@@ -1,58 +1,51 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.collectors.datafile.model;
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
-import org.junit.jupiter.api.Test;
+import java.nio.file.Paths;
-class CommonFunctionsTest {
- // @formatter:off
- private ConsumerDmaapModel model = ImmutableConsumerDmaapModel.builder()
- .productName("NrRadio")
- .vendorName("Ericsson")
- .lastEpochMicrosec("8745745764578")
- .sourceName("oteNB5309")
- .startEpochMicrosec("8745745764578")
- .timeZoneOffset("UTC+05:00")
- .name("A20161224.1030-1045.bin.gz")
- .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1145.bin.gz")
- .internalLocation("target/A20161224.1030-1045.bin.gz")
- .compression("gzip")
- .fileFormatType("org.3GPP.32.435#measCollec")
- .fileFormatVersion("V10")
- .build();
+import org.junit.jupiter.api.Test;
- private static final String EXPECTED_RESULT =
- "{\"productName\":\"NrRadio\","
- + "\"vendorName\":\"Ericsson\","
- + "\"lastEpochMicrosec\":\"8745745764578\","
- + "\"sourceName\":\"oteNB5309\","
- + "\"startEpochMicrosec\":\"8745745764578\","
- + "\"timeZoneOffset\":\"UTC+05:00\","
- + "\"name\":\"A20161224.1030-1045.bin.gz\","
- + "\"location\":\"ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1145.bin.gz\","
- + "\"internalLocation\":\"target/A20161224.1030-1045.bin.gz\","
- + "\"compression\":\"gzip\","
- + "\"fileFormatType\":\"org.3GPP.32.435#measCollec\","
- + "\"fileFormatVersion\":\"V10\"}";
- // @formatter:on
+public class CommonFunctionsTest {
@Test
- void createJsonBody_shouldReturnJsonInString() {
- assertEquals(EXPECTED_RESULT, CommonFunctions.createJsonBody(model));
+ public void createJsonBody_success() {
+ ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel //
+ .builder() //
+ .productName("") //
+ .vendorName("") //
+ .lastEpochMicrosec("") //
+ .sourceName("") //
+ .startEpochMicrosec("") //
+ .timeZoneOffset("") //
+ .name("") //
+ .location("") //
+ .internalLocation(Paths.get("internalLocation")) //
+ .compression("") //
+ .fileFormatType("") //
+ .fileFormatVersion("") //
+ .build();
+ String actualBody = CommonFunctions.createJsonBody(consumerDmaapModel);
+
+ assertTrue(actualBody.contains("\"internalLocation\":\"internalLocation\""));
}
-} \ No newline at end of file
+}
diff --git a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java
index 2c5e701d..0c1ac436 100644
--- a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java
+++ b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java
@@ -16,6 +16,9 @@
package org.onap.dcaegen2.collectors.datafile.model;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -28,29 +31,27 @@ public class ConsumerDmaapModelTest {
private static final String TIME_ZONE_OFFSET = "UTC+05:00";
private static final String NAME = "A20161224.1030-1045.bin.gz";
private static final String LOCATION = "ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1145.bin.gz";
- private static final String INTERNAL_LOCATION = "target/A20161224.1030-1045.bin.gz";
+ private static final Path INTERNAL_LOCATION = Paths.get("target/A20161224.1030-1045.bin.gz");
private static final String COMPRESSION = "gzip";
private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
private static final String FILE_FORMAT_VERSION = "V10";
@Test
public void consumerDmaapModelBuilder_shouldBuildAnObject() {
- // @formatter:off
- ConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .name(NAME)
- .location(LOCATION)
- .internalLocation(INTERNAL_LOCATION)
- .compression(COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ ConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .name(NAME) //
+ .location(LOCATION) //
+ .internalLocation(INTERNAL_LOCATION) //
+ .compression(COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- // @formatter:on
Assertions.assertNotNull(consumerDmaapModel);
Assertions.assertEquals(PRODUCT_NAME, consumerDmaapModel.getProductName());
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
index 1bf3ec5a..4283debf 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
@@ -67,7 +67,7 @@ public class FtpsClient implements FileCollectClient {
} catch (DatafileTaskException e) {
throw e;
} catch (Exception e) {
- throw new DatafileTaskException("Could not open connection: ", e);
+ throw new DatafileTaskException("Could not open connection: " + e, e);
}
}
@@ -100,7 +100,7 @@ public class FtpsClient implements FileCollectClient {
throw new DatafileTaskException("Could not retrieve file " + remoteFileName);
}
} catch (IOException e) {
- throw new DatafileTaskException("Could not fetch file: ", e);
+ throw new DatafileTaskException("Could not fetch file: " + e, e);
}
logger.trace("collectFile fetched: {}", localFileName);
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
index 2f489166..dec8af42 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
@@ -79,7 +79,7 @@ public class SftpClient implements FileCollectClient {
sftpChannel = getChannel(session);
}
} catch (JSchException e) {
- throw new DatafileTaskException("Could not open Sftp client", e);
+ throw new DatafileTaskException("Could not open Sftp client" + e, e);
}
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java
new file mode 100644
index 00000000..e01a941b
--- /dev/null
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java
@@ -0,0 +1,67 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.http;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+
+import org.apache.http.client.RedirectStrategy;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+
+/**
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+public class HttpAsyncClientBuilderWrapper implements IHttpAsyncClientBuilder {
+ HttpAsyncClientBuilder builder = HttpAsyncClients.custom();
+
+ @Override
+ public IHttpAsyncClientBuilder setRedirectStrategy(RedirectStrategy redirectStrategy) {
+ builder.setRedirectStrategy(redirectStrategy);
+ return this;
+ }
+
+ @Override
+ public IHttpAsyncClientBuilder setSSLContext(SSLContext sslcontext) {
+ builder.setSSLContext(sslcontext);
+ return this;
+ }
+
+ @Override
+ public IHttpAsyncClientBuilder setSSLHostnameVerifier(HostnameVerifier hostnameVerifier) {
+ builder.setSSLHostnameVerifier(hostnameVerifier);
+ return this;
+ }
+
+ @Override
+ public IHttpAsyncClientBuilder setDefaultRequestConfig(RequestConfig config) {
+ builder.setDefaultRequestConfig(config);
+ return this;
+ }
+
+ @Override
+ public CloseableHttpAsyncClient build() {
+ return builder.build();
+ }
+
+}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/IHttpAsyncClientBuilder.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/IHttpAsyncClientBuilder.java
new file mode 100644
index 00000000..e0a51a80
--- /dev/null
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/IHttpAsyncClientBuilder.java
@@ -0,0 +1,43 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.http;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+
+import org.apache.http.client.RedirectStrategy;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+
+/**
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+public interface IHttpAsyncClientBuilder {
+ public IHttpAsyncClientBuilder setRedirectStrategy(final RedirectStrategy redirectStrategy);
+
+ public IHttpAsyncClientBuilder setSSLContext(final SSLContext sslcontext);
+
+ public IHttpAsyncClientBuilder setSSLHostnameVerifier(final HostnameVerifier hostnameVerifier);
+
+ public IHttpAsyncClientBuilder setDefaultRequestConfig(final RequestConfig config);
+
+ public CloseableHttpAsyncClient build();
+}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java
deleted file mode 100644
index 5295b124..00000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.io;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Path;
-import org.springframework.core.io.FileSystemResource;
-
-public class FileSystemResourceWrapper implements IFileSystemResource {
- private FileSystemResource realResource;
-
- @Override
- public void setPath(Path path) {
- realResource = new FileSystemResource(path);
- }
- @Override
- public InputStream getInputStream() throws IOException {
- return realResource.getInputStream();
- }
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java
deleted file mode 100644
index 23f14a33..00000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.io;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Path;
-
-public interface IFileSystemResource {
-
- public void setPath(Path filePath);
-
- public InputStream getInputStream() throws IOException;
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
index 9304688f..9bd5d57f 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
@@ -16,54 +16,34 @@
package org.onap.dcaegen2.collectors.datafile.service.producer;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
-
-import com.google.gson.JsonElement;
-import com.google.gson.JsonParser;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
import java.nio.charset.StandardCharsets;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.Future;
import javax.net.ssl.SSLContext;
import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse;
-import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
-import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
-import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.ssl.SSLContextBuilder;
-import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper;
-import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
-import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.http.HttpAsyncClientBuilderWrapper;
+import org.onap.dcaegen2.collectors.datafile.http.IHttpAsyncClientBuilder;
import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpStatus;
import org.springframework.web.util.DefaultUriBuilderFactory;
-
-import reactor.core.publisher.Mono;
+import org.springframework.web.util.UriBuilder;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
@@ -71,11 +51,7 @@ import reactor.core.publisher.Mono;
*/
public class DmaapProducerReactiveHttpClient {
- private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
- private static final String NAME_JSON_TAG = "name";
- private static final String INTERNAL_LOCATION_JSON_TAG = "internalLocation";
- private static final String URI_SEPARATOR = "/";
- private static final String DEFAULT_FEED_ID = "1";
+ private static final int NO_REQUEST_TIMEOUT = -1;
private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
private static final Marker INVOKE_RETURN = MarkerFactory.getMarker("INVOKE_RETURN");
@@ -83,14 +59,10 @@ public class DmaapProducerReactiveHttpClient {
private final String dmaapHostName;
private final Integer dmaapPortNumber;
- private final String dmaapTopicName;
private final String dmaapProtocol;
- private final String dmaapContentType;
private final String user;
private final String pwd;
- private IFileSystemResource fileResource = new FileSystemResourceWrapper();
-
/**
* Constructor DmaapProducerReactiveHttpClient.
*
@@ -99,96 +71,86 @@ public class DmaapProducerReactiveHttpClient {
public DmaapProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName();
this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber();
- this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName();
this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
- this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType();
this.user = dmaapPublisherConfiguration.dmaapUserName();
this.pwd = dmaapPublisherConfiguration.dmaapUserPassword();
}
- /**
- * Function for calling DMaaP HTTP producer - post request to DMaaP DataRouter.
- *
- * @param consumerDmaapModel - object which will be sent to DMaaP DataRouter
- * @return status code of operation
- */
- public Mono<HttpStatus> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel,
- Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
- try (CloseableHttpAsyncClient webClient = createWebClient()) {
-
- HttpPut put = new HttpPut();
- prepareHead(consumerDmaapModel, put);
- prepareBody(consumerDmaapModel, put);
- addUserCredentialsToHead(put);
-
- logger.trace(INVOKE, "Starting to publish to DR {}", consumerDmaapModel.getInternalLocation());
- Future<HttpResponse> future = webClient.execute(put, null);
+ public HttpResponse getDmaapProducerResponseWithRedirect(HttpUriRequest request, Map<String, String> contextMap)
+ throws DatafileTaskException {
+ try (CloseableHttpAsyncClient webClient = createWebClient(true, NO_REQUEST_TIMEOUT)) {
+ MdcVariables.setMdcContextMap(contextMap);
+ webClient.start();
+
+ logger.trace(INVOKE, "Starting to produce to DR {}", request);
+ Future<HttpResponse> future = webClient.execute(request, null);
HttpResponse response = future.get();
- logger.trace(INVOKE_RETURN, "Response from DR {}", response);
- return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
+ logger.trace(INVOKE_RETURN, "Response from DR {}", response.toString());
+ return response;
} catch (Exception e) {
- logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e);
- return Mono.error(e);
+ throw new DatafileTaskException("Unable to create web client.", e);
}
}
- private void addUserCredentialsToHead(HttpPut put) {
+ public HttpResponse getDmaapProducerResponseWithCustomTimeout(HttpUriRequest request, int requestTimeout,
+ Map<String, String> contextMap) throws DatafileTaskException {
+ try (CloseableHttpAsyncClient webClient = createWebClient(false, requestTimeout)) {
+ MdcVariables.setMdcContextMap(contextMap);
+ webClient.start();
+
+ logger.trace(INVOKE, "Starting to produce to DR {}", request);
+ Future<HttpResponse> future = webClient.execute(request, null);
+ HttpResponse response = future.get();
+ logger.trace(INVOKE_RETURN, "Response from DR {}", response.toString());
+ return response;
+ } catch (Exception e) {
+ throw new DatafileTaskException("Unable to create web client.", e);
+ }
+ }
+
+ public void addUserCredentialsToHead(HttpUriRequest request) {
String plainCreds = user + ":" + pwd;
byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes);
String base64Creds = new String(base64CredsBytes);
logger.trace("base64Creds...: {}", base64Creds);
- put.addHeader("Authorization", "Basic " + base64Creds);
+ request.addHeader("Authorization", "Basic " + base64Creds);
}
- private void prepareHead(ConsumerDmaapModel model, HttpPut put) {
- put.addHeader(HttpHeaders.CONTENT_TYPE, dmaapContentType);
- JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
- String name = metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
- metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
- put.addHeader(X_DMAAP_DR_META, metaData.toString());
- put.setURI(getUri(name));
-
- String requestID = MDC.get(REQUEST_ID);
- put.addHeader(X_ONAP_REQUEST_ID, requestID);
- String invocationID = UUID.randomUUID().toString();
- put.addHeader(X_INVOCATION_ID, invocationID);
+ public UriBuilder getBaseUri() {
+ return new DefaultUriBuilderFactory().builder() //
+ .scheme(dmaapProtocol) //
+ .host(dmaapHostName) //
+ .port(dmaapPortNumber);
}
- private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException {
- Path fileLocation = Paths.get(model.getInternalLocation());
- this.fileResource.setPath(fileLocation);
- InputStream fileInputStream = fileResource.getInputStream();
-
- put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
+ private CloseableHttpAsyncClient createWebClient(boolean expectRedirect, int requestTimeout)
+ throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
+ SSLContext sslContext =
+ new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build();
- }
+ IHttpAsyncClientBuilder clientBuilder = getHttpClientBuilder();
+ clientBuilder.setSSLContext(sslContext) //
+ .setSSLHostnameVerifier(new NoopHostnameVerifier());
- private URI getUri(String fileName) {
- String path = dmaapTopicName + URI_SEPARATOR + DEFAULT_FEED_ID + URI_SEPARATOR + fileName;
- return new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber)
- .path(path).build();
- }
+ if (expectRedirect) {
+ clientBuilder.setRedirectStrategy(PublishRedirectStrategy.INSTANCE);
+ }
- void setFileSystemResource(IFileSystemResource fileSystemResource) {
- fileResource = fileSystemResource;
- }
+ if (requestTimeout > NO_REQUEST_TIMEOUT) {
+ RequestConfig requestConfig = RequestConfig.custom() //
+ .setSocketTimeout(requestTimeout) //
+ .setConnectTimeout(requestTimeout) //
+ .setConnectionRequestTimeout(requestTimeout) //
+ .build();
- protected CloseableHttpAsyncClient createWebClient()
- throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
+ clientBuilder.setDefaultRequestConfig(requestConfig);
+ }
- SSLContext sslContext = new SSLContextBuilder() //
- .loadTrustMaterial(null, (certificate, authType) -> true) //
- .build();
-
- CloseableHttpAsyncClient webClient = HttpAsyncClients.custom() //
- .setSSLContext(sslContext) //
- .setSSLHostnameVerifier(new NoopHostnameVerifier()) //
- .setRedirectStrategy(PublishRedirectStrategy.INSTANCE) //
- .build();
- webClient.start();
- return webClient;
+ return clientBuilder.build();
}
+ IHttpAsyncClientBuilder getHttpClientBuilder() {
+ return new HttpAsyncClientBuilderWrapper();
+ }
} \ No newline at end of file
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
index 06ff570c..91c4c334 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
@@ -16,51 +16,44 @@
package org.onap.dcaegen2.collectors.datafile.service.producer;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonParser;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
-import java.nio.file.Paths;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import javax.net.ssl.SSLContext;
+
import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.io.IOUtils;
+import org.apache.http.Header;
import org.apache.http.HttpResponse;
-import org.apache.http.StatusLine;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPut;
-import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
-import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
+import org.mockito.ArgumentCaptor;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.http.IHttpAsyncClientBuilder;
+import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpStatus;
-import org.springframework.web.util.DefaultUriBuilderFactory;
-
-import reactor.test.StepVerifier;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
@@ -68,32 +61,24 @@ import reactor.test.StepVerifier;
*/
class DmaapProducerReactiveHttpClientTest {
- private static final String FILE_NAME = "A20161224.1030-1045.bin.gz";
- private static final String INTERNAL_LOCATION_JSON_TAG = "internalLocation";
- private static final String NAME_JSON_TAG = "name";
- private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
-
private static final String HOST = "54.45.33.2";
private static final String HTTPS_SCHEME = "https";
private static final int PORT = 1234;
- private static final String APPLICATION_OCTET_STREAM_CONTENT_TYPE = "application/octet-stream";
- private static final String URI_SEPARATOR = "/";
- private static final String PUBLISH_TOPIC = "publish";
- private static final String DEFAULT_FEED_ID = "1";
- private static final String FILE_CONTENT = "Just a string.";
+ private static final String USER_NAME = "dradmin";
+ private static final int TWO_SECOND_TIMEOUT = 2000;
- private DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient;
+ private static final Map<String, String> CONTEXT_MAP = new HashMap<>();
+
+
+ private DmaapProducerReactiveHttpClient producerClientUnderTestSpy;
private DmaapPublisherConfiguration dmaapPublisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
- private ConsumerDmaapModel consumerDmaapModel;
- private IFileSystemResource fileSystemResourceMock = mock(IFileSystemResource.class);
+ private IHttpAsyncClientBuilder clientBuilderMock;
+
private CloseableHttpAsyncClient clientMock;
- private HttpResponse responseMock = mock(HttpResponse.class);
@SuppressWarnings("unchecked")
private Future<HttpResponse> futureMock = mock(Future.class);
- private StatusLine statusLine = mock(StatusLine.class);
- private InputStream fileStream;
@BeforeEach
void setUp() throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
@@ -102,82 +87,114 @@ class DmaapProducerReactiveHttpClientTest {
when(dmaapPublisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT);
when(dmaapPublisherConfigurationMock.dmaapUserName()).thenReturn("dradmin");
when(dmaapPublisherConfigurationMock.dmaapUserPassword()).thenReturn("dradmin");
- when(dmaapPublisherConfigurationMock.dmaapContentType()).thenReturn(APPLICATION_OCTET_STREAM_CONTENT_TYPE);
- when(dmaapPublisherConfigurationMock.dmaapTopicName()).thenReturn(PUBLISH_TOPIC);
-
- // @formatter:off
- consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
- .productName("NrRadio")
- .vendorName("Ericsson")
- .lastEpochMicrosec("8745745764578")
- .sourceName("oteNB5309")
- .startEpochMicrosec("8745745764578")
- .timeZoneOffset("UTC+05:00")
- .name("A20161224.1030-1045.bin.gz")
- .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1145.bin.gz")
- .internalLocation("target/A20161224.1030-1045.bin.gz")
- .compression("gzip")
- .fileFormatType("org.3GPP.32.435#measCollec")
- .fileFormatVersion("V10")
- .build();
- //formatter:on
-
- dmaapProducerReactiveHttpClient = spy(new DmaapProducerReactiveHttpClient(dmaapPublisherConfigurationMock));
- dmaapProducerReactiveHttpClient.setFileSystemResource(fileSystemResourceMock);
+
+ producerClientUnderTestSpy = spy(new DmaapProducerReactiveHttpClient(dmaapPublisherConfigurationMock));
+
+ clientBuilderMock = mock(IHttpAsyncClientBuilder.class);
clientMock = mock(CloseableHttpAsyncClient.class);
- doReturn(clientMock).when(dmaapProducerReactiveHttpClient).createWebClient();
}
@Test
- void getHttpResponse_Success() throws Exception {
- mockWebClientDependantObject();
+ void getHttpResponseWithRederict_Success() throws Exception {
+ doReturn(clientBuilderMock).when(producerClientUnderTestSpy).getHttpClientBuilder();
+ when(clientBuilderMock.setSSLContext(any(SSLContext.class))).thenReturn(clientBuilderMock);
+ when(clientBuilderMock.setSSLHostnameVerifier(any(NoopHostnameVerifier.class))).thenReturn(clientBuilderMock);
+ when(clientBuilderMock.build()).thenReturn(clientMock);
+ when(clientMock.execute(any(HttpUriRequest.class), any())).thenReturn(futureMock);
+ HttpResponse responseMock = mock(HttpResponse.class);
+ when(futureMock.get()).thenReturn(responseMock);
- HttpPut httpPut = new HttpPut();
- httpPut.addHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_OCTET_STREAM_CONTENT_TYPE);
+ HttpGet request = new HttpGet();
+ producerClientUnderTestSpy.getDmaapProducerResponseWithRedirect(request, CONTEXT_MAP);
- URI expectedUri = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT)
- .path(PUBLISH_TOPIC + URI_SEPARATOR + DEFAULT_FEED_ID + URI_SEPARATOR + FILE_NAME).build();
- httpPut.setURI(expectedUri);
+ verify(clientBuilderMock).setSSLContext(any(SSLContext.class));
+ verify(clientBuilderMock).setSSLHostnameVerifier(any(NoopHostnameVerifier.class));
+ verify(clientBuilderMock).setRedirectStrategy(PublishRedirectStrategy.INSTANCE);
+ verify(clientBuilderMock).build();
+ verifyNoMoreInteractions(clientBuilderMock);
- JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(consumerDmaapModel));
- metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
- metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
- httpPut.addHeader(X_DMAAP_DR_META, metaData.toString());
+ verify(clientMock).start();
+ verify(clientMock).close();
- String plainCreds = "dradmin" + ":" + "dradmin";
- byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
- byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes);
- String base64Creds = new String(base64CredsBytes);
- httpPut.addHeader("Authorization", "Basic " + base64Creds);
+ verify(futureMock).get();
+ verifyNoMoreInteractions(futureMock);
+ }
- fileStream.reset();
- Map<String, String> contextMap = new HashMap<>();
- StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel, contextMap))
- .expectNext(HttpStatus.OK).verifyComplete();
+ @Test
+ void getHttpResponseWithCustomTimeout_Success() throws Exception {
+ doReturn(clientBuilderMock).when(producerClientUnderTestSpy).getHttpClientBuilder();
+ when(clientBuilderMock.setSSLContext(any(SSLContext.class))).thenReturn(clientBuilderMock);
+ when(clientBuilderMock.setDefaultRequestConfig(any(RequestConfig.class))).thenReturn(clientBuilderMock);
+ when(clientBuilderMock.build()).thenReturn(clientMock);
+ when(clientMock.execute(any(HttpUriRequest.class), any())).thenReturn(futureMock);
+ HttpResponse responseMock = mock(HttpResponse.class);
+ when(futureMock.get()).thenReturn(responseMock);
- verify(fileSystemResourceMock).setPath(Paths.get("target/" + FILE_NAME));
- InputStream fileInputStream = fileSystemResourceMock.getInputStream();
- httpPut.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
+ HttpGet request = new HttpGet();
+ producerClientUnderTestSpy.getDmaapProducerResponseWithCustomTimeout(request, TWO_SECOND_TIMEOUT, CONTEXT_MAP);
+
+ ArgumentCaptor<RequestConfig> requestConfigCaptor = ArgumentCaptor.forClass(RequestConfig.class);
+ verify(clientBuilderMock).setSSLContext(any(SSLContext.class));
+ verify(clientBuilderMock).setSSLHostnameVerifier(any(NoopHostnameVerifier.class));
+ verify(clientBuilderMock).setDefaultRequestConfig(requestConfigCaptor.capture());
+ RequestConfig requestConfig = requestConfigCaptor.getValue();
+ assertEquals(TWO_SECOND_TIMEOUT, requestConfig.getSocketTimeout());
+ assertEquals(TWO_SECOND_TIMEOUT, requestConfig.getConnectTimeout());
+ assertEquals(TWO_SECOND_TIMEOUT, requestConfig.getConnectionRequestTimeout());
+ verify(clientBuilderMock).build();
+ verifyNoMoreInteractions(clientBuilderMock);
+
+ verify(clientMock).start();
+ verify(clientMock).close();
+
+ verify(futureMock).get();
+ verifyNoMoreInteractions(futureMock);
}
@Test
- void getHttpResponse_Fail() throws Exception {
- Map<String, String> contextMap = new HashMap<>();
- doReturn(futureMock).when(clientMock).execute(any(), any());
- doThrow(new InterruptedException()).when(futureMock).get();
- StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel, contextMap)) //
- .expectError() //
- .verify(); //
+ public void getResponseWithException_throwsException() throws Exception {
+ doReturn(clientBuilderMock).when(producerClientUnderTestSpy).getHttpClientBuilder();
+ when(clientBuilderMock.setDefaultRequestConfig(any(RequestConfig.class))).thenReturn(clientBuilderMock);
+ when(clientBuilderMock.setSSLContext(any(SSLContext.class))).thenReturn(clientBuilderMock);
+ when(clientBuilderMock.build()).thenReturn(clientMock);
+ HttpPut request = new HttpPut();
+ when(clientMock.execute(any(HttpPut.class), any())).thenReturn(futureMock);
+
+ try {
+ when(futureMock.get()).thenThrow(new InterruptedException("Interrupted"));
+
+ producerClientUnderTestSpy.getDmaapProducerResponseWithCustomTimeout(request, TWO_SECOND_TIMEOUT,
+ CONTEXT_MAP);
+
+ fail("Should have got an exception.");
+ } catch (DatafileTaskException e) {
+ assertTrue(e.getCause() instanceof InterruptedException);
+ assertEquals("Interrupted", e.getCause().getMessage());
+ } catch (Exception e) {
+ fail("Wrong exception");
+ }
+
+ verify(clientMock).start();
+ verify(clientMock).close();
}
- private void mockWebClientDependantObject()
- throws IOException, InterruptedException, ExecutionException {
- fileStream = new ByteArrayInputStream(FILE_CONTENT.getBytes());
- when(fileSystemResourceMock.getInputStream()).thenReturn(fileStream);
- when(clientMock.execute(any(HttpPut.class), any())).thenReturn(futureMock);
- when(futureMock.get()).thenReturn(responseMock);
- when(responseMock.getStatusLine()).thenReturn(statusLine);
- when(statusLine.getStatusCode()).thenReturn(HttpUtils.SC_OK);
+ @Test
+ public void addCredentialsToHead_success() {
+ HttpPut request = new HttpPut();
+
+ producerClientUnderTestSpy.addUserCredentialsToHead(request);
+ String plainCreds = USER_NAME + ":" + USER_NAME;
+ byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
+ byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes);
+ String base64Creds = "Basic " + new String(base64CredsBytes);
+ Header[] authorizationHeaders = request.getHeaders("Authorization");
+ assertEquals(base64Creds, authorizationHeaders[0].getValue());
+ }
+
+ @Test
+ public void getBaseUri_success() {
+ URI uri = producerClientUnderTestSpy.getBaseUri().build();
+ assertEquals(HTTPS_SCHEME + "://" + HOST + ":" + PORT, uri.toString());
}
}