aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormaximesson <maxime.bonneau@est.tech>2019-03-21 15:58:55 +0000
committermaximesson <maxime.bonneau@est.tech>2019-03-21 15:58:55 +0000
commit4bd281390ed24b278846775c1157f82db81fddbe (patch)
tree1ffaf2384e830e9659e379aab0c833732924ccce
parent6870154043d73d527cc42aca7ade7e49aa961476 (diff)
Add check to DataRouter if file has been published
For each file in the FileReady message that DFC does not know if it has been published yet, it should ask DataRouter if it has been published already to avoid downloading and publishing a file more than once. Change-Id: I18117a6e968ec929aa255052a4c44f890a8ed39d Issue-ID: DCAEGEN2-1256 Signed-off-by: maximesson <maxime.bonneau@est.tech>
-rw-r--r--.gitignore4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java6
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java40
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java58
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java19
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java126
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java40
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java119
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java38
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java (renamed from datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java)0
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java87
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java519
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java208
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java226
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java22
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java175
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java20
-rw-r--r--datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java2
-rw-r--r--datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java35
-rw-r--r--datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java5
-rw-r--r--datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java87
-rw-r--r--datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java33
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java4
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java2
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java67
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/IHttpAsyncClientBuilder.java43
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java37
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java28
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java164
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java215
30 files changed, 1512 insertions, 917 deletions
diff --git a/.gitignore b/.gitignore
index d03f9a95..37707c82 100644
--- a/.gitignore
+++ b/.gitignore
@@ -45,7 +45,7 @@ buildNumber.properties
.mvn/timing.properties
.mvn/wrapper/maven-wrapper.jar
-# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
-hs_err_pid*
+# CheckStyle files
+.checkstyle
opt/ \ No newline at end of file
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index 82c390f7..f89a1012 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -42,6 +42,8 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.stereotype.Component;
/**
+ * Holds all configuration for the DFC.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
@@ -79,8 +81,10 @@ public class AppConfig {
return ftpesConfiguration;
}
+ /**
+ * Reads the configuration from file.
+ */
public void loadConfigurationFromFile() {
-
GsonBuilder gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
JsonParser parser = new JsonParser();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
index b4dc6353..52723330 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -18,6 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.INVOCATION_ID;
import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
+
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
@@ -25,7 +26,9 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
+
import javax.annotation.PostConstruct;
+
import org.apache.commons.lang3.StringUtils;
import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
@@ -40,11 +43,15 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
+
import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
/**
+ * Api for starting and stopping DFC.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/13/18
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
@Configuration
@EnableScheduling
@@ -63,11 +70,18 @@ public class SchedulerConfig {
private final ScheduledTasks scheduledTask;
private final CloudConfiguration cloudConfiguration;
+ /**
+ * Constructor.
+ *
+ * @param taskScheduler The scheduler used to schedule the tasks.
+ * @param scheduledTasks The scheduler that will actually handle the tasks.
+ * @param cloudConfiguration The DFC configuration.
+ */
@Autowired
- public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTask,
- CloudConfiguration cloudConfiguration) {
+ public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTasks,
+ CloudConfiguration cloudConfiguration) {
this.taskScheduler = taskScheduler;
- this.scheduledTask = scheduledTask;
+ this.scheduledTask = scheduledTasks;
this.cloudConfiguration = cloudConfiguration;
}
@@ -84,7 +98,7 @@ public class SchedulerConfig {
logger.info(EXIT, "Stopped Datafile workflow");
MDC.clear();
return Mono.defer(() -> Mono
- .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED)));
+ .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED)));
}
/**
@@ -106,12 +120,14 @@ public class SchedulerConfig {
contextMap = MDC.getCopyOfContextMap();
logger.info(ENTRY, "Start scheduling Datafile workflow");
if (scheduledFutureList.isEmpty()) {
- scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(() -> cloudConfiguration.runTask(contextMap), Instant.now(),
- SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY));
- scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.scheduleMainDatafileEventTask(contextMap),
- SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
- scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()),
- SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE));
+ scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(() -> cloudConfiguration.runTask(contextMap),
+ Instant.now(), SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY));
+ scheduledFutureList.add(
+ taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.scheduleMainDatafileEventTask(contextMap),
+ SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
+ scheduledFutureList
+ .add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()),
+ SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE));
return true;
} else {
@@ -119,4 +135,4 @@ public class SchedulerConfig {
}
}
-} \ No newline at end of file
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index a8f79ea1..af45cc99 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -88,11 +88,17 @@ public class JsonMessageParser {
}
}
+ /**
+ * Parses the Json message and returns a stream of messages.
+ *
+ * @param rawMessage the Json message to parse.
+ * @return a <code>Flux</code> containing messages.
+ */
public Flux<FileReadyMessage> getMessagesFromJson(Mono<String> rawMessage) {
return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createMessageData);
}
- public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
+ Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
JsonParser jsonParser = new JsonParser();
return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
: element.isJsonObject() ? Optional.of((JsonObject) element)
@@ -136,13 +142,11 @@ public class JsonMessageParser {
List<FileData> allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap);
if (!allFileDataFromJson.isEmpty()) {
MessageMetaData messageMetaData = optionalMessageMetaData.get();
- // @formatter:off
- return Mono.just(ImmutableFileReadyMessage.builder()
- .pnfName(messageMetaData.sourceName())
- .messageMetaData(messageMetaData)
- .files(allFileDataFromJson)
+ return Mono.just(ImmutableFileReadyMessage.builder() //
+ .pnfName(messageMetaData.sourceName()) //
+ .messageMetaData(messageMetaData) //
+ .files(allFileDataFromJson) //
.build());
- // @formatter:on
} else {
return Mono.empty();
}
@@ -168,18 +172,16 @@ public class JsonMessageParser {
// version.
getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION, missingValues);
- // @formatter:off
- MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
- .productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues))
- .vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues))
- .lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues))
- .sourceName(getValueFromJson(commonEventHeader, SOURCE_NAME, missingValues))
- .startEpochMicrosec(getValueFromJson(commonEventHeader, START_EPOCH_MICROSEC, missingValues))
- .timeZoneOffset(getValueFromJson(commonEventHeader, TIME_ZONE_OFFSET, missingValues))
- .changeIdentifier(changeIdentifier)
- .changeType(changeType)
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
+ .productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues)) //
+ .vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues)) //
+ .lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues)) //
+ .sourceName(getValueFromJson(commonEventHeader, SOURCE_NAME, missingValues)) //
+ .startEpochMicrosec(getValueFromJson(commonEventHeader, START_EPOCH_MICROSEC, missingValues)) //
+ .timeZoneOffset(getValueFromJson(commonEventHeader, TIME_ZONE_OFFSET, missingValues)) //
+ .changeIdentifier(changeIdentifier) //
+ .changeType(changeType) //
.build();
- // @formatter:on
if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) {
return Optional.of(messageMetaData);
} else {
@@ -231,16 +233,14 @@ public class JsonMessageParser {
logger.error("Unable to collect file from xNF.", e);
return Optional.empty();
}
- // @formatter:off
- FileData fileData = ImmutableFileData.builder()
- .name(getValueFromJson(fileInfo, NAME, missingValues))
- .fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues))
- .fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues))
- .location(location)
- .scheme(scheme)
- .compression(getValueFromJson(data, COMPRESSION, missingValues))
+ FileData fileData = ImmutableFileData.builder() //
+ .name(getValueFromJson(fileInfo, NAME, missingValues)) //
+ .fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues)) //
+ .fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues)) //
+ .location(location) //
+ .scheme(scheme) //
+ .compression(getValueFromJson(data, COMPRESSION, missingValues)) //
.build();
- // @formatter:on
if (missingValues.isEmpty()) {
return Optional.of(fileData);
}
@@ -250,8 +250,8 @@ public class JsonMessageParser {
}
/**
- * Gets data from the event name, defined as:
- * {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example:
+ * Gets data from the event name.
+ * Defined as: {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example:
* Noti_RnNode-Ericsson_FileReady
*
* @param dataType The type of data to get, {@link DmaapConsumerJsonParser.EventNameDataType}.
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
index 2cb84112..e2dca182 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
@@ -13,6 +13,7 @@
* the License.
* ============LICENSE_END========================================================================
*/
+
package org.onap.dcaegen2.collectors.datafile.service;
import java.nio.file.Path;
@@ -29,14 +30,30 @@ import java.util.Map;
public class PublishedFileCache {
private final Map<Path, Instant> publishedFiles = Collections.synchronizedMap(new HashMap<Path, Instant>());
+ /**
+ * Adds a file to the cache.
+ *
+ * @param path the name of the file to add.
+ * @return <code>null</code> if the file is not already in the cache.
+ */
public Instant put(Path path) {
return publishedFiles.put(path, Instant.now());
}
+ /**
+ * Removes a file from the cache.
+ *
+ * @param localFileName name of the file to remove.
+ */
public void remove(Path localFileName) {
publishedFiles.remove(localFileName);
}
+ /**
+ * Removes files 24 hours older than the given instant.
+ *
+ * @param now the instant will determine which files that will be purged.
+ */
public void purge(Instant now) {
for (Iterator<Map.Entry<Path, Instant>> it = publishedFiles.entrySet().iterator(); it.hasNext();) {
Map.Entry<Path, Instant> pair = it.next();
@@ -46,7 +63,7 @@ public class PublishedFileCache {
}
}
- public int size() {
+ int size() {
return publishedFiles.size();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index 57edc364..0fef9ab4 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -1,24 +1,46 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.collectors.datafile.tasks;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.file.Path;
import java.time.Duration;
import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
@@ -26,17 +48,30 @@ import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReact
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.springframework.core.io.FileSystemResource;
+import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
+
import reactor.core.publisher.Mono;
/**
+ * Publishes a file to the DataRouter.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
public class DataRouterPublisher {
+ private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
+ private static final String CONTENT_TYPE = "application/octet-stream";
+ private static final String NAME_JSON_TAG = "name";
+ private static final String INTERNAL_LOCATION_JSON_TAG = "internalLocation";
+ private static final String PUBLISH_TOPIC = "publish";
+ private static final String DEFAULT_FEED_ID = "1";
private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class);
private final AppConfig datafileAppConfig;
+ private DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient;
public DataRouterPublisher(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
@@ -44,25 +79,70 @@ public class DataRouterPublisher {
/**
- * Publish one file
- * @param consumerDmaapModel information about the file to publish
- * @param maxNumberOfRetries the maximal number of retries if the publishing fails
- * @param firstBackoffTimeout the time to delay the first retry
+ * Publish one file.
+ *
+ * @param model information about the file to publish
+ * @param numRetries the maximal number of retries if the publishing fails
+ * @param firstBackoff the time to delay the first retry
* @return the HTTP response status as a string
*/
public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff,
Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
logger.trace("Method called with arg {}", model);
- DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient();
+ dmaapProducerReactiveHttpClient = resolveClient();
- //@formatter:off
return Mono.just(model)
.cache()
- .flatMap(m -> dmaapProducerReactiveHttpClient.getDmaapProducerResponse(m, contextMap))
- .flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap))
+ .flatMap(m -> publishFile(m, contextMap)) //
+ .flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap)) //
.retryBackoff(numRetries, firstBackoff);
- //@formatter:on
+ }
+
+ private Mono<HttpStatus> publishFile(ConsumerDmaapModel consumerDmaapModel, Map<String, String> contextMap) {
+ logger.trace("Entering publishFile with {}", consumerDmaapModel);
+ try {
+ HttpPut put = new HttpPut();
+ String requestId = MDC.get(REQUEST_ID);
+ put.addHeader(X_ONAP_REQUEST_ID, requestId);
+ String invocationId = UUID.randomUUID().toString();
+ put.addHeader(X_INVOCATION_ID, invocationId);
+
+ prepareHead(consumerDmaapModel, put);
+ prepareBody(consumerDmaapModel, put);
+ dmaapProducerReactiveHttpClient.addUserCredentialsToHead(put);
+
+ HttpResponse response =
+ dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, contextMap);
+ logger.trace(response.toString());
+ return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
+ } catch (Exception e) {
+ logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e);
+ return Mono.error(e);
+ }
+ }
+
+ private void prepareHead(ConsumerDmaapModel model, HttpPut put) {
+ put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE);
+ JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
+ metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
+ metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
+ put.addHeader(X_DMAAP_DR_META, metaData.toString());
+ put.setURI(getPublishUri(model.getInternalLocation().getFileName().toString()));
+ }
+
+ private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException {
+ Path fileLocation = model.getInternalLocation();
+ try (InputStream fileInputStream = createInputStream(fileLocation)) {
+ put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
+ }
+ }
+
+ private URI getPublishUri(String fileName) {
+ return dmaapProducerReactiveHttpClient.getBaseUri() //
+ .pathSegment(PUBLISH_TOPIC) //
+ .pathSegment(DEFAULT_FEED_ID) //
+ .pathSegment(fileName).build();
}
private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model,
@@ -77,6 +157,10 @@ public class DataRouterPublisher {
}
}
+ InputStream createInputStream(Path filePath) throws IOException {
+ FileSystemResource realResource = new FileSystemResource(filePath);
+ return realResource.getInputStream();
+ }
DmaapPublisherConfiguration resolveConfiguration() {
return datafileAppConfig.getDmaapPublisherConfiguration();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index af4670e3..fb27a579 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -20,6 +20,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Map;
+
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
@@ -33,6 +34,7 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import reactor.core.publisher.Mono;
/**
@@ -52,12 +54,10 @@ public class FileCollector {
MdcVariables.setMdcContextMap(contextMap);
logger.trace("Entering execute with {}", fileData);
- //@formatter:off
- return Mono.just(fileData)
- .cache()
- .flatMap(fd -> collectFile(fileData, metaData, contextMap))
- .retryBackoff(maxNumberOfRetries, firstBackoffTimeout);
- //@formatter:on
+ return Mono.just(fileData) //
+ .cache() //
+ .flatMap(fd -> collectFile(fileData, metaData, contextMap)) //
+ .retryBackoff(maxNumberOfRetries, firstBackoffTimeout);
}
private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData,
@@ -92,22 +92,20 @@ public class FileCollector {
private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, MessageMetaData metaData, Path localFile) {
String location = fileData.location();
- // @formatter:off
- return ImmutableConsumerDmaapModel.builder()
- .productName(metaData.productName())
- .vendorName(metaData.vendorName())
- .lastEpochMicrosec(metaData.lastEpochMicrosec())
- .sourceName(metaData.sourceName())
- .startEpochMicrosec(metaData.startEpochMicrosec())
- .timeZoneOffset(metaData.timeZoneOffset())
- .name(fileData.name())
- .location(location)
- .internalLocation(localFile.toString())
- .compression(fileData.compression())
- .fileFormatType(fileData.fileFormatType())
- .fileFormatVersion(fileData.fileFormatVersion())
+ return ImmutableConsumerDmaapModel.builder() //
+ .productName(metaData.productName()) //
+ .vendorName(metaData.vendorName()) //
+ .lastEpochMicrosec(metaData.lastEpochMicrosec()) //
+ .sourceName(metaData.sourceName()) //
+ .startEpochMicrosec(metaData.startEpochMicrosec()) //
+ .timeZoneOffset(metaData.timeZoneOffset()) //
+ .name(fileData.name()) //
+ .location(location) //
+ .internalLocation(localFile) //
+ .compression(fileData.compression()) //
+ .fileFormatType(fileData.fileFormatType()) //
+ .fileFormatVersion(fileData.fileFormatVersion()) //
.build();
- // @formatter:on
}
SftpClient createSftpClient(FileData fileData) throws DatafileTaskException {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
new file mode 100644
index 00000000..0729caa0
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
@@ -0,0 +1,119 @@
+/*-
+* ============LICENSE_START=======================================================
+* Copyright (C) 2019 Nordix Foundation.
+* ================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+* SPDX-License-Identifier: Apache-2.0
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpGet;
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
+import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+/**
+ * Bean used to check with DataRouter if a file has been published.
+ *
+ * @author <a href="mailto:maxime.bonneau@est.tech">Maxime Bonneau</a>
+ *
+ */
+public class PublishedChecker {
+ private static final String FEEDLOG_TOPIC = "feedlog";
+ private static final String DEFAULT_FEED_ID = "1";
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private AppConfig appConfig;
+
+ /**
+ * Constructor.
+ *
+ * @param appConfig The DFC configuration.
+ */
+ public PublishedChecker(AppConfig appConfig) {
+ this.appConfig = appConfig;
+ }
+
+ /**
+ * Checks with DataRouter if the given file has been published already.
+ *
+ * @param fileName the name of the file used when it is published.
+ *
+ * @return <code>true</code> if the file has been published before, <code>false</code> otherwise.
+ */
+ public boolean execute(String fileName, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
+ DmaapProducerReactiveHttpClient producerClient = resolveClient();
+
+ HttpGet getRequest = new HttpGet();
+ String requestId = MDC.get(REQUEST_ID);
+ getRequest.addHeader(X_ONAP_REQUEST_ID, requestId);
+ String invocationId = UUID.randomUUID().toString();
+ getRequest.addHeader(X_INVOCATION_ID, invocationId);
+ getRequest.setURI(getPublishedQueryUri(fileName, producerClient));
+ producerClient.addUserCredentialsToHead(getRequest);
+
+ try {
+ HttpResponse response =
+ producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest, 2000, contextMap);
+
+ logger.trace(response.toString());
+ int status = response.getStatusLine().getStatusCode();
+ HttpEntity entity = response.getEntity();
+ InputStream content = entity.getContent();
+ String body = IOUtils.toString(content);
+ return HttpStatus.SC_OK == status && !"[]".equals(body);
+ } catch (Exception e) {
+ logger.warn("Unable to check if file has been published.", e);
+ return false;
+ }
+ }
+
+ private URI getPublishedQueryUri(String fileName, DmaapProducerReactiveHttpClient producerClient) {
+ return producerClient.getBaseUri() //
+ .pathSegment(FEEDLOG_TOPIC) //
+ .pathSegment(DEFAULT_FEED_ID) //
+ .queryParam("type", "pub") //
+ .queryParam("filename", fileName) //
+ .build();
+ }
+
+ protected DmaapPublisherConfiguration resolveConfiguration() {
+ return appConfig.getDmaapPublisherConfiguration();
+ }
+
+ protected DmaapProducerReactiveHttpClient resolveClient() {
+ return new DmaapProducerReactiveHttpClient(resolveConfiguration());
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 28963377..d41e5c25 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -18,13 +18,13 @@ package org.onap.dcaegen2.collectors.datafile.tasks;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
@@ -37,14 +37,15 @@ import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
/**
- * This implements the main flow of the data file collector. Fetch file ready events from the
- * message router, fetch new files from the PNF publish these in the data router.
+ * This implements the main flow of the data file collector. Fetch file ready events from the message router, fetch new
+ * files from the PNF publish these in the data router.
*/
@Component
public class ScheduledTasks {
@@ -52,7 +53,7 @@ public class ScheduledTasks {
private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200;
private static final int MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS = 10;
- /** Data needed for fetching of one file */
+ /** Data needed for fetching of one file. */
private class FileCollectionData {
final FileData fileData;
final MessageMetaData metaData;
@@ -64,6 +65,7 @@ public class ScheduledTasks {
}
private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
+
private final AppConfig applicationConfiguration;
private final AtomicInteger currentNumberOfTasks = new AtomicInteger();
private final Scheduler scheduler =
@@ -96,17 +98,17 @@ public class ScheduledTasks {
.parallel(getParallelism()) // Each FileReadyMessage in a separate thread
.runOn(scheduler) //
.flatMap(this::createFileCollectionTask) //
- .filter(this::shouldBePublished) //
+ .filter(fileData -> shouldBePublished(fileData, contextMap)) //
.doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
.flatMap(fileData -> collectFileFromXnf(fileData, contextMap)) //
.flatMap(model -> publishToDataRouter(model, contextMap)) //
- .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()), contextMap)) //
+ .doOnNext(model -> deleteFile(model.getInternalLocation(), contextMap)) //
.doOnNext(model -> currentNumberOfTasks.decrementAndGet()) //
.sequential();
}
/**
- * called in regular intervals to remove out-dated cached information
+ * called in regular intervals to remove out-dated cached information.
*/
public void purgeCachedInformation(Instant now) {
alreadyPublishedFiles.purge(now);
@@ -144,8 +146,13 @@ public class ScheduledTasks {
return Flux.fromIterable(fileCollects);
}
- private boolean shouldBePublished(FileCollectionData task) {
- return alreadyPublishedFiles.put(task.fileData.getLocalFileName()) == null;
+ private boolean shouldBePublished(FileCollectionData task, Map<String, String> contextMap) {
+ boolean result = false;
+ Path localFileName = task.fileData.getLocalFileName();
+ if (alreadyPublishedFiles.put(localFileName) == null) {
+ result = !createPublishedChecker().execute(localFileName.getFileName().toString(), contextMap);
+ }
+ return result;
}
private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect,
@@ -156,7 +163,7 @@ public class ScheduledTasks {
MdcVariables.setMdcContextMap(contextMap);
return createFileCollector()
.execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout,
- contextMap)
+ contextMap)
.onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, contextMap));
}
@@ -174,10 +181,9 @@ public class ScheduledTasks {
final long maxNumberOfRetries = 3;
final Duration initialRetryTimeout = Duration.ofSeconds(5);
- DataRouterPublisher publisherTask = createDataRouterPublisher();
MdcVariables.setMdcContextMap(contextMap);
- return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout, contextMap)
+ return createDataRouterPublisher().execute(model, maxNumberOfRetries, initialRetryTimeout, contextMap)
.onErrorResume(exception -> handlePublishFailure(model, exception, contextMap));
}
@@ -185,7 +191,7 @@ public class ScheduledTasks {
Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
logger.error("File publishing failed: {}, exception: {}", model.getName(), exception);
- Path internalFileName = Paths.get(model.getInternalLocation());
+ Path internalFileName = model.getInternalLocation();
deleteFile(internalFileName, contextMap);
alreadyPublishedFiles.remove(internalFileName);
currentNumberOfTasks.decrementAndGet();
@@ -223,6 +229,10 @@ public class ScheduledTasks {
}
}
+ PublishedChecker createPublishedChecker() {
+ return new PublishedChecker(applicationConfiguration);
+ }
+
int getCurrentNumberOfTasks() {
return currentNumberOfTasks.get();
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
index 2c136304..2c136304 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
index acae1e6e..b67fac23 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
@@ -28,56 +28,49 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.Immutabl
class CloudConfigParserTest {
-
- private static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG =
- //@formatter:on
- new ImmutableDmaapConsumerConfiguration.Builder()
- .timeoutMs(-1)
- .dmaapHostName("message-router.onap.svc.cluster.local")
- .dmaapUserName("admin")
- .dmaapUserPassword("admin")
- .dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT")
- .dmaapPortNumber(2222)
- .dmaapContentType("application/json")
- .messageLimit(-1)
- .dmaapProtocol("http")
- .consumerId("C12")
- .consumerGroup("OpenDCAE-c12")
- .trustStorePath("trustStorePath")
- .trustStorePasswordPath("trustStorePasswordPath")
- .keyStorePath("keyStorePath")
- .keyStorePasswordPath("keyStorePasswordPath")
- .enableDmaapCertAuth(true)
+ private static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = //
+ new ImmutableDmaapConsumerConfiguration.Builder() //
+ .timeoutMs(-1) //
+ .dmaapHostName("message-router.onap.svc.cluster.local") //
+ .dmaapUserName("admin") //
+ .dmaapUserPassword("admin") //
+ .dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT") //
+ .dmaapPortNumber(2222) //
+ .dmaapContentType("application/json") //
+ .messageLimit(-1) //
+ .dmaapProtocol("http") //
+ .consumerId("C12") //
+ .consumerGroup("OpenDCAE-c12") //
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
.build();
- //@formatter:off
-
- private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG =
- //@formatter:on
- new ImmutableDmaapPublisherConfiguration.Builder()
- .dmaapTopicName("publish")
- .dmaapUserPassword("dradmin")
- .dmaapPortNumber(3907)
- .dmaapProtocol("https")
- .dmaapContentType("application/json")
- .dmaapHostName("message-router.onap.svc.cluster.local")
- .dmaapUserName("dradmin")
- .trustStorePath("trustStorePath")
- .trustStorePasswordPath("trustStorePasswordPath")
- .keyStorePath("keyStorePath")
- .keyStorePasswordPath("keyStorePasswordPath")
- .enableDmaapCertAuth(true)
+
+ private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = //
+ new ImmutableDmaapPublisherConfiguration.Builder() //
+ .dmaapTopicName("publish") //
+ .dmaapUserPassword("dradmin") //
+ .dmaapPortNumber(3907) //
+ .dmaapProtocol("https") //
+ .dmaapContentType("application/json") //
+ .dmaapHostName("message-router.onap.svc.cluster.local") //
+ .dmaapUserName("dradmin") //
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
.build();
- //@formatter:off
-
- private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION =
- //@formatter:on
- new ImmutableFtpesConfig.Builder()
- .keyCert("/config/ftpKey.jks")
- .keyPassword("secret")
- .trustedCA("config/cacerts")
- .trustedCAPassword("secret")
+
+ private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION = //
+ new ImmutableFtpesConfig.Builder() //
+ .keyCert("/config/ftpKey.jks") //
+ .keyPassword("secret") //
+ .trustedCA("config/cacerts") //
+ .trustedCAPassword("secret") //
.build();
- //@formatter:off
private CloudConfigParser cloudConfigParser = new CloudConfigParser(getCloudConfigJsonObject());
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
index f7b83297..b33180fa 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
@@ -68,48 +68,47 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJson_oneFileReadyMessage() throws DmaapNotFoundException {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
- .build();
-
- MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .build();
- FileData expectedFileData = ImmutableFileData.builder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .scheme(Scheme.FTPS)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
+ .build();
+
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .build();
+ FileData expectedFileData = ImmutableFileData.builder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .scheme(Scheme.FTPS) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
List<FileData> files = new ArrayList<>();
files.add(expectedFileData);
FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
- .pnfName(SOURCE_NAME)
- .messageMetaData(messageMetaData)
- .files(files)
+ .pnfName(SOURCE_NAME) //
+ .messageMetaData(messageMetaData) //
+ .files(files) //
.build();
- // @formatter:on
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -123,48 +122,47 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithTwoEvents_twoMessages() throws DmaapNotFoundException {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
- .build();
-
- MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .build();
- FileData expectedFileData = ImmutableFileData.builder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .scheme(Scheme.FTPS)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
+ .build();
+
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .build();
+ FileData expectedFileData = ImmutableFileData.builder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .scheme(Scheme.FTPS) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
List<FileData> files = new ArrayList<>();
files.add(expectedFileData);
- FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
- .pnfName(SOURCE_NAME)
- .messageMetaData(messageMetaData)
- .files(files)
+ FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() //
+ .pnfName(SOURCE_NAME) //
+ .messageMetaData(messageMetaData) //
+ .files(files) //
.build();
- // @formatter:on
+
String parsedString = message.getParsed();
String messageString = "[" + parsedString + "," + parsedString + "]";
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -178,21 +176,20 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithoutLocation_noMessage() {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
.build();
- // @formatter:on
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -206,48 +203,47 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() throws DmaapNotFoundException {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
- .build();
-
- MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .build();
- FileData expectedFileData = ImmutableFileData.builder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .scheme(Scheme.FTPS)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
+ .build();
+
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .build();
+ FileData expectedFileData = ImmutableFileData.builder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .scheme(Scheme.FTPS) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
List<FileData> files = new ArrayList<>();
files.add(expectedFileData);
- FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
- .pnfName(SOURCE_NAME)
- .messageMetaData(messageMetaData)
- .files(files)
+ FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() //
+ .pnfName(SOURCE_NAME) //
+ .messageMetaData(messageMetaData) //
+ .files(files) //
.build();
- // @formatter:on
+
String parsedString = message.getParsed();
String messageString = "[{\"event\":{}}," + parsedString + "]";
JsonMessageParser jsonMessageParserUnderTest = new JsonMessageParser();
@@ -258,21 +254,20 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithFaultyEventName_noFileData() {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName("Faulty event name")
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName("Faulty event name") //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
.build();
- // @formatter:on
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -286,21 +281,20 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithoutName_noFileData() {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
.build();
- // @formatter:on
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -314,14 +308,13 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithoutAdditionalFields_noFileData() {
- // @formatter:off
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .build();
- // @formatter:on
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .build();
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -335,21 +328,20 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithoutCompression_noFileData() {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
- .build();
- // @formatter:on
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
+ .build();
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -363,21 +355,20 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithoutFileFormatType_noFileData() {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
.build();
- // @formatter:on
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -391,55 +382,54 @@ class JsonMessageParserTest {
@Test
void whenPassingOneCorrectJsonWithoutFileFormatVersionAndOneCorrect_oneFileData() {
- // @formatter:off
- AdditionalField additionalFaultyField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .build();
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalFaultyField)
- .addAdditionalField(additionalField)
- .build();
-
- MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .build();
- FileData expectedFileData = ImmutableFileData.builder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .scheme(Scheme.FTPS)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalFaultyField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .build();
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalFaultyField) //
+ .addAdditionalField(additionalField) //
+ .build();
+
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .build();
+ FileData expectedFileData = ImmutableFileData.builder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .scheme(Scheme.FTPS) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
List<FileData> files = new ArrayList<>();
files.add(expectedFileData);
- FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
- .pnfName(SOURCE_NAME)
- .messageMetaData(messageMetaData)
- .files(files)
+ FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() //
+ .pnfName(SOURCE_NAME) //
+ .messageMetaData(messageMetaData) //
+ .files(files) //
.build();
- // @formatter:on
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -453,14 +443,13 @@ class JsonMessageParserTest {
@Test
void whenPassingJsonWithoutMandatoryHeaderInformation_noFileData() {
- // @formatter:off
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier("PM_MEAS_FILES_INVALID")
- .changeType("FileReady_INVALID")
- .notificationFieldsVersion("1.0_INVALID")
- .build();
- // @formatter:on
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier("PM_MEAS_FILES_INVALID") //
+ .changeType("FileReady_INVALID") //
+ .notificationFieldsVersion("1.0_INVALID") //
+ .build();
+
String incorrectMessageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -486,21 +475,20 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithIncorrectChangeType_noFileData() {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
.compression(GZIP_COMPRESSION)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(INCORRECT_CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(INCORRECT_CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
.build();
- // @formatter:on
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -514,21 +502,20 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithIncorrectChangeIdentifier_noFileData() {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(INCORRECT_CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(INCORRECT_CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
.build();
- // @formatter:on
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
index f88e301d..a695e20d 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
@@ -28,6 +28,8 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
@@ -74,7 +76,7 @@ public class DMaaPMessageConsumerTaskImplTest {
private static final String PORT_22 = "22";
private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME;
- private static final String LOCAL_FILE_LOCATION = "target/" + PM_FILE_NAME;
+ private static final Path LOCAL_FILE_LOCATION = Paths.get("target/" + PM_FILE_NAME);
private static final String FTPES_LOCATION = FTPES_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
private static final String GZIP_COMPRESSION = "gzip";
@@ -86,7 +88,7 @@ public class DMaaPMessageConsumerTaskImplTest {
private static AppConfig appConfig;
private static DmaapConsumerConfiguration dmaapConsumerConfiguration;
private DMaaPMessageConsumerTask messageConsumerTask;
- private DMaaPConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient;
+ private DMaaPConsumerReactiveHttpClient httpClientMock;
private static String ftpesMessageString;
private static FileData ftpesFileData;
@@ -96,156 +98,163 @@ public class DMaaPMessageConsumerTaskImplTest {
private static FileData sftpFileData;
private static FileReadyMessage expectedSftpMessage;
+ /**
+ * Sets up data for the test.
+ */
@BeforeAll
public static void setUp() {
- //@formatter:off
- dmaapConsumerConfiguration = new ImmutableDmaapConsumerConfiguration.Builder()
- .consumerGroup("OpenDCAE-c12")
- .consumerId("c12")
- .dmaapContentType("application/json")
- .dmaapHostName("54.45.33.2")
- .dmaapPortNumber(1234).dmaapProtocol("https")
- .dmaapUserName("Datafile")
- .dmaapUserPassword("Datafile")
- .dmaapTopicName("unauthenticated.NOTIFICATION")
- .timeoutMs(-1)
- .messageLimit(-1)
- .trustStorePath("trustStorePath")
- .trustStorePasswordPath("trustStorePasswordPath")
- .keyStorePath("keyStorePath")
- .keyStorePasswordPath("keyStorePasswordPath")
- .enableDmaapCertAuth(true)
+ dmaapConsumerConfiguration = new ImmutableDmaapConsumerConfiguration.Builder() //
+ .consumerGroup("OpenDCAE-c12") //
+ .consumerId("c12") //
+ .dmaapContentType("application/json") //
+ .dmaapHostName("54.45.33.2") //
+ .dmaapPortNumber(1234).dmaapProtocol("https") //
+ .dmaapUserName("Datafile") //
+ .dmaapUserPassword("Datafile") //
+ .dmaapTopicName("unauthenticated.NOTIFICATION") //
+ .timeoutMs(-1) //
+ .messageLimit(-1) //
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
.build();
appConfig = mock(AppConfig.class);
- AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder()
- .location(FTPES_LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .location(FTPES_LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage ftpesJsonMessage = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
- .changeType(FILE_READY_CHANGE_TYPE)
- .notificationFieldsVersion("1.0")
- .addAdditionalField(ftpesAdditionalField)
+ JsonMessage ftpesJsonMessage = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) //
+ .changeType(FILE_READY_CHANGE_TYPE) //
+ .notificationFieldsVersion("1.0") //
+ .addAdditionalField(ftpesAdditionalField) //
.build();
ftpesMessageString = ftpesJsonMessage.toString();
- MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
- .changeType(FILE_READY_CHANGE_TYPE)
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) //
+ .changeType(FILE_READY_CHANGE_TYPE) //
.build();
- ftpesFileData = ImmutableFileData.builder()
- .name(PM_FILE_NAME)
- .location(FTPES_LOCATION)
- .scheme(Scheme.FTPS)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ ftpesFileData = ImmutableFileData.builder() //
+ .name(PM_FILE_NAME) //
+ .location(FTPES_LOCATION) //
+ .scheme(Scheme.FTPS) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
List<FileData> files = new ArrayList<>();
files.add(ftpesFileData);
- expectedFtpesMessage = ImmutableFileReadyMessage.builder()
- .pnfName(SOURCE_NAME)
- .messageMetaData(messageMetaData)
- .files(files)
+ expectedFtpesMessage = ImmutableFileReadyMessage.builder() //
+ .pnfName(SOURCE_NAME) //
+ .messageMetaData(messageMetaData) //
+ .files(files) //
.build();
- AdditionalField sftpAdditionalField = new JsonMessage.AdditionalFieldBuilder()
- .location(SFTP_LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField sftpAdditionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .location(SFTP_LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage sftpJsonMessage = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
- .changeType(FILE_READY_CHANGE_TYPE)
- .notificationFieldsVersion("1.0")
- .addAdditionalField(sftpAdditionalField)
+ JsonMessage sftpJsonMessage = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) //
+ .changeType(FILE_READY_CHANGE_TYPE) //
+ .notificationFieldsVersion("1.0") //
+ .addAdditionalField(sftpAdditionalField) //
.build();
sftpMessageString = sftpJsonMessage.toString();
- sftpFileData = ImmutableFileData.builder()
- .name(PM_FILE_NAME)
- .location(SFTP_LOCATION)
- .scheme(Scheme.FTPS)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ sftpFileData = ImmutableFileData.builder() //
+ .name(PM_FILE_NAME) //
+ .location(SFTP_LOCATION) //
+ .scheme(Scheme.FTPS) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .name(PM_FILE_NAME)
- .location(FTPES_LOCATION)
- .internalLocation(LOCAL_FILE_LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .name(PM_FILE_NAME) //
+ .location(FTPES_LOCATION) //
+ .internalLocation(LOCAL_FILE_LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
listOfConsumerDmaapModel.add(consumerDmaapModel);
files = new ArrayList<>();
files.add(sftpFileData);
- expectedSftpMessage = ImmutableFileReadyMessage.builder()
- .pnfName(SOURCE_NAME)
- .messageMetaData(messageMetaData)
- .files(files)
+ expectedSftpMessage = ImmutableFileReadyMessage.builder() //
+ .pnfName(SOURCE_NAME) //
+ .messageMetaData(messageMetaData) //
+ .files(files) //
.build();
- //@formatter:on
}
@Test
public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() {
prepareMocksForDmaapConsumer("", null);
- StepVerifier.create(messageConsumerTask.execute()).expectSubscription()
- .expectError(DatafileTaskException.class).verify();
+ StepVerifier.create(messageConsumerTask.execute()) //
+ .expectSubscription() //
+ .expectError(DatafileTaskException.class) //
+ .verify();
- verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
+ verify(httpClientMock, times(1)).getDMaaPConsumerResponse();
}
@Test
public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException {
prepareMocksForDmaapConsumer(ftpesMessageString, expectedFtpesMessage);
- StepVerifier.create(messageConsumerTask.execute()).expectNext(expectedFtpesMessage).verifyComplete();
+ StepVerifier.create(messageConsumerTask.execute()) //
+ .expectNext(expectedFtpesMessage) //
+ .verifyComplete();
- verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
- verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
+ verify(httpClientMock, times(1)).getDMaaPConsumerResponse();
+ verifyNoMoreInteractions(httpClientMock);
}
@Test
public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException {
prepareMocksForDmaapConsumer(sftpMessageString, expectedSftpMessage);
- StepVerifier.create(messageConsumerTask.execute()).expectNext(expectedSftpMessage).verifyComplete();
+ StepVerifier.create(messageConsumerTask.execute()) //
+ .expectNext(expectedSftpMessage) //
+ .verifyComplete();
- verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
- verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
+ verify(httpClientMock, times(1)).getDMaaPConsumerResponse();
+ verifyNoMoreInteractions(httpClientMock);
}
private void prepareMocksForDmaapConsumer(String message, FileReadyMessage fileReadyMessageAfterConsume) {
Mono<String> messageAsMono = Mono.just(message);
JsonMessageParser jsonMessageParserMock = mock(JsonMessageParser.class);
- dmaapConsumerReactiveHttpClient = mock(DMaaPConsumerReactiveHttpClient.class);
- when(dmaapConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(messageAsMono);
+ httpClientMock = mock(DMaaPConsumerReactiveHttpClient.class);
+ when(httpClientMock.getDMaaPConsumerResponse()).thenReturn(messageAsMono);
if (!message.isEmpty()) {
when(jsonMessageParserMock.getMessagesFromJson(messageAsMono))
@@ -255,9 +264,8 @@ public class DMaaPMessageConsumerTaskImplTest {
.thenReturn(Flux.error(new DatafileTaskException("problemas")));
}
- messageConsumerTask =
- spy(new DMaaPMessageConsumerTask(appConfig, dmaapConsumerReactiveHttpClient, jsonMessageParserMock));
+ messageConsumerTask = spy(new DMaaPMessageConsumerTask(appConfig, httpClientMock, jsonMessageParserMock));
when(messageConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration);
- doReturn(dmaapConsumerReactiveHttpClient).when(messageConsumerTask).resolveClient();
+ doReturn(httpClientMock).when(messageConsumerTask).resolveClient();
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
index d612d17c..ed8b93f1 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -16,8 +16,9 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -25,19 +26,36 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+
+import org.apache.http.Header;
+import org.apache.http.HttpResponse;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
import org.springframework.http.HttpStatus;
-import reactor.core.publisher.Mono;
+import org.springframework.web.util.DefaultUriBuilderFactory;
+import org.springframework.web.util.UriBuilder;
+
import reactor.test.StepVerifier;
/**
@@ -52,31 +70,41 @@ class DataRouterPublisherTest {
private static final String START_EPOCH_MICROSEC = "8745745764578";
private static final String TIME_ZONE_OFFSET = "UTC+05:00";
private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
+ private static final String FTPES_ADDRESS = "ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME;
+ private static final String LOCAL_FILE_NAME = SOURCE_NAME + "_" + PM_FILE_NAME;
+
+ private static final String COMPRESSION = "gzip";
+ private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
+ private static final String FILE_FORMAT_VERSION = "V10";
+ private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
+
+ private static final String HOST = "54.45.33.2";
+ private static final String HTTPS_SCHEME = "https";
+ private static final int PORT = 1234;
+ private static final String APPLICATION_OCTET_STREAM_CONTENT_TYPE = "application/octet-stream";
+ private static final String PUBLISH_TOPIC = "publish";
+ private static final String FEED_ID = "1";
+ private static final String FILE_CONTENT = "Just a string.";
+
+ private static final Map<String, String> CONTEXT_MAP = new HashMap<>();
private static ConsumerDmaapModel consumerDmaapModel;
- private static DataRouterPublisher dmaapPublisherTask;
- private static DmaapProducerReactiveHttpClient dMaaPProducerReactiveHttpClient;
+ private static DmaapProducerReactiveHttpClient httpClientMock;
private static AppConfig appConfig;
- private static DmaapPublisherConfiguration dmaapPublisherConfiguration;
+ private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
+ private static DataRouterPublisher publisherTaskUnderTestSpy;
+
+ /**
+ * Sets up data for tests.
+ */
@BeforeAll
public static void setUp() {
+ when(publisherConfigurationMock.dmaapHostName()).thenReturn(HOST);
+ when(publisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME);
+ when(publisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT);
- dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder()
- .dmaapContentType("application/json") //
- .dmaapHostName("54.45.33.2") //
- .dmaapPortNumber(1234) //
- .dmaapProtocol("https") //
- .dmaapUserName("DFC") //
- .dmaapUserPassword("DFC") //
- .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") //
- .trustStorePath("trustStorePath") //
- .trustStorePasswordPath("trustStorePasswordPath") //
- .keyStorePath("keyStorePath") //
- .keyStorePasswordPath("keyStorePasswordPath") //
- .enableDmaapCertAuth(true) //
- .build(); //
- consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
+ consumerDmaapModel = ImmutableConsumerDmaapModel.builder() //
.productName(PRODUCT_NAME) //
.vendorName(VENDOR_NAME) //
.lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
@@ -84,61 +112,145 @@ class DataRouterPublisherTest {
.startEpochMicrosec(START_EPOCH_MICROSEC) //
.timeZoneOffset(TIME_ZONE_OFFSET) //
.name(PM_FILE_NAME) //
- .location("ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME) //
- .internalLocation("target/" + PM_FILE_NAME) //
+ .location(FTPES_ADDRESS) //
+ .internalLocation(Paths.get("target/" + LOCAL_FILE_NAME)) //
.compression("gzip") //
- .fileFormatType("org.3GPP.32.435#measCollec") //
- .fileFormatVersion("V10") //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build(); //
appConfig = mock(AppConfig.class);
-
- doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
+ publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig));
}
@Test
- public void whenPassedObjectFits_ReturnsCorrectStatus() {
- prepareMocksForTests(Mono.just(HttpStatus.OK));
+ public void whenPassedObjectFits_ReturnsCorrectStatus() throws Exception {
+ prepareMocksForTests(null, Integer.valueOf(HttpStatus.OK.value()));
+
+ StepVerifier
+ .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), CONTEXT_MAP))
+ .expectNext(consumerDmaapModel) //
+ .verifyComplete();
+
+ ArgumentCaptor<HttpUriRequest> requestCaptor = ArgumentCaptor.forClass(HttpUriRequest.class);
+ verify(httpClientMock).getBaseUri();
+ verify(httpClientMock).addUserCredentialsToHead(any(HttpUriRequest.class));
+ verify(httpClientMock).getDmaapProducerResponseWithRedirect(requestCaptor.capture(), any());
+ verifyNoMoreInteractions(httpClientMock);
- Map<String, String> contextMap = new HashMap<>();
- StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
- .expectNext(consumerDmaapModel).verifyComplete();
+ HttpPut actualPut = (HttpPut) requestCaptor.getValue();
+ URI actualUri = actualPut.getURI();
+ assertEquals(HTTPS_SCHEME, actualUri.getScheme());
+ assertEquals(HOST, actualUri.getHost());
+ assertEquals(PORT, actualUri.getPort());
+ Path actualPath = Paths.get(actualUri.getPath());
+ assertTrue(PUBLISH_TOPIC.equals(actualPath.getName(0).toString()));
+ assertTrue(FEED_ID.equals(actualPath.getName(1).toString()));
+ assertTrue(LOCAL_FILE_NAME.equals(actualPath.getName(2).toString()));
- verify(dMaaPProducerReactiveHttpClient, times(1)).getDmaapProducerResponse(any(), eq(contextMap));
- verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
+ Header[] contentHeaders = actualPut.getHeaders("content-type");
+ assertEquals(APPLICATION_OCTET_STREAM_CONTENT_TYPE, contentHeaders[0].getValue());
+
+ Header[] metaHeaders = actualPut.getHeaders(X_DMAAP_DR_META);
+ Map<String, String> metaHash = getMetaDataAsMap(metaHeaders);
+ assertTrue(10 == metaHash.size());
+ assertEquals(PRODUCT_NAME, metaHash.get("productName"));
+ assertEquals(VENDOR_NAME, metaHash.get("vendorName"));
+ assertEquals(LAST_EPOCH_MICROSEC, metaHash.get("lastEpochMicrosec"));
+ assertEquals(SOURCE_NAME, metaHash.get("sourceName"));
+ assertEquals(START_EPOCH_MICROSEC, metaHash.get("startEpochMicrosec"));
+ assertEquals(TIME_ZONE_OFFSET, metaHash.get("timeZoneOffset"));
+ assertEquals(COMPRESSION, metaHash.get("compression"));
+ assertEquals(FTPES_ADDRESS, metaHash.get("location"));
+ assertEquals(FILE_FORMAT_TYPE, metaHash.get("fileFormatType"));
+ assertEquals(FILE_FORMAT_VERSION, metaHash.get("fileFormatVersion"));
+ }
+
+ @Test
+ void whenPassedObjectFits_firstFailsWithExceptionThenSucceeds() throws Exception {
+ prepareMocksForTests(new DatafileTaskException("Error"), HttpStatus.OK.value());
+
+ StepVerifier
+ .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 2, Duration.ofSeconds(0), CONTEXT_MAP))
+ .expectNext(consumerDmaapModel) //
+ .verifyComplete();
}
@Test
- public void whenPassedObjectFits_firstFailsThenSucceeds() {
- prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.OK));
+ public void whenPassedObjectFits_firstFailsThenSucceeds() throws Exception {
+ prepareMocksForTests(null, Integer.valueOf(HttpStatus.BAD_GATEWAY.value()),
+ Integer.valueOf(HttpStatus.OK.value()));
- Map<String, String> contextMap = new HashMap<>();
- StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
- .expectNext(consumerDmaapModel).verifyComplete();
+ StepVerifier
+ .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), CONTEXT_MAP))
+ .expectNext(consumerDmaapModel) //
+ .verifyComplete();
- verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any(), eq(contextMap));
- verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
+ verify(httpClientMock, times(2)).getBaseUri();
+ verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class));
+ verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any());
+ verifyNoMoreInteractions(httpClientMock);
}
@Test
- public void whenPassedObjectFits_firstFailsThenFails() {
- prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.BAD_GATEWAY));
+ public void whenPassedObjectFits_firstFailsThenFails() throws Exception {
+ prepareMocksForTests(null, Integer.valueOf(HttpStatus.BAD_GATEWAY.value()),
+ Integer.valueOf((HttpStatus.BAD_GATEWAY.value())));
- Map<String, String> contextMap = new HashMap<>();
- StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
- .expectErrorMessage("Retries exhausted: 1/1").verify();
+ StepVerifier
+ .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), CONTEXT_MAP))
+ .expectErrorMessage("Retries exhausted: 1/1") //
+ .verify();
- verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any(), eq(contextMap));
- verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
+ verify(httpClientMock, times(2)).getBaseUri();
+ verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class));
+ verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any());
+ verifyNoMoreInteractions(httpClientMock);
}
@SafeVarargs
- final void prepareMocksForTests(Mono<HttpStatus> firstResponse, Mono<HttpStatus>... nextHttpResponses) {
- dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class);
- when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any(), any())).thenReturn(firstResponse,
- nextHttpResponses);
-
- dmaapPublisherTask = spy(new DataRouterPublisher(appConfig));
- when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration);
- doReturn(dMaaPProducerReactiveHttpClient).when(dmaapPublisherTask).resolveClient();
+ final void prepareMocksForTests(Exception exception, Integer firstResponse, Integer... nextHttpResponses)
+ throws Exception {
+ httpClientMock = mock(DmaapProducerReactiveHttpClient.class);
+ when(appConfig.getDmaapPublisherConfiguration()).thenReturn(publisherConfigurationMock);
+ doReturn(publisherConfigurationMock).when(publisherTaskUnderTestSpy).resolveConfiguration();
+ doReturn(httpClientMock).when(publisherTaskUnderTestSpy).resolveClient();
+
+ UriBuilder uriBuilder = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT);
+ when(httpClientMock.getBaseUri()).thenReturn(uriBuilder);
+
+ HttpResponse httpResponseMock = mock(HttpResponse.class);
+ if (exception == null) {
+ when(httpClientMock.getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any()))
+ .thenReturn(httpResponseMock);
+ } else {
+ when(httpClientMock.getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any()))
+ .thenThrow(exception).thenReturn(httpResponseMock);
+ }
+ StatusLine statusLineMock = mock(StatusLine.class);
+ when(httpResponseMock.getStatusLine()).thenReturn(statusLineMock);
+ when(statusLineMock.getStatusCode()).thenReturn(firstResponse, nextHttpResponses);
+
+ InputStream fileStream = new ByteArrayInputStream(FILE_CONTENT.getBytes());
+ doReturn(fileStream).when(publisherTaskUnderTestSpy).createInputStream(Paths.get("target", LOCAL_FILE_NAME));
+ }
+
+ private Map<String, String> getMetaDataAsMap(Header[] metaHeaders) {
+ Map<String, String> metaHash = new HashMap<>();
+ String actualMetaData = metaHeaders[0].getValue();
+ actualMetaData = actualMetaData.substring(1, actualMetaData.length() - 1);
+ actualMetaData = actualMetaData.replace("\"", "");
+ String[] commaSplitedMetaData = actualMetaData.split(",");
+ for (int i = 0; i < commaSplitedMetaData.length; i++) {
+ String[] keyValuePair = commaSplitedMetaData[i].split(":");
+ if (keyValuePair.length > 2) {
+ List<String> arrayKeyValuePair = new ArrayList<>(keyValuePair.length);
+ for (int j = 1; j < keyValuePair.length; j++) {
+ arrayKeyValuePair.add(keyValuePair[j]);
+ }
+ keyValuePair[1] = String.join(":", arrayKeyValuePair);
+ }
+ metaHash.put(keyValuePair[0], keyValuePair[1]);
+ }
+ return metaHash;
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
index c266d50e..fb49c860 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -134,7 +134,7 @@ public class FileCollectorTest {
.timeZoneOffset(TIME_ZONE_OFFSET)
.name(PM_FILE_NAME)
.location(location)
- .internalLocation(LOCAL_FILE_LOCATION.toString())
+ .internalLocation(LOCAL_FILE_LOCATION)
.compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
@@ -161,7 +161,8 @@ public class FileCollectorTest {
ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT);
Map<String, String> contextMap = new HashMap<>();
- StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+ StepVerifier.create(
+ collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
.expectNext(expectedConsumerDmaapModel).verifyComplete();
verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
@@ -179,7 +180,9 @@ public class FileCollectorTest {
ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(SFTP_LOCATION_NO_PORT);
Map<String, String> contextMap = new HashMap<>();
- StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+ StepVerifier
+ .create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0),
+ contextMap))
.expectNext(expectedConsumerDmaapModel) //
.verifyComplete();
@@ -187,7 +190,9 @@ public class FileCollectorTest {
fileData = createFileData(SFTP_LOCATION, Scheme.SFTP);
expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(SFTP_LOCATION);
- StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+ StepVerifier
+ .create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0),
+ contextMap))
.expectNext(expectedConsumerDmaapModel) //
.verifyComplete();
@@ -206,7 +211,8 @@ public class FileCollectorTest {
.collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
Map<String, String> contextMap = new HashMap<>();
- StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+ StepVerifier.create(
+ collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
.expectErrorMessage("Retries exhausted: 3/3").verify();
verify(ftpsClientMock, times(4)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
@@ -223,10 +229,10 @@ public class FileCollectorTest {
FileData fileData = createFileData(FTPES_LOCATION_NO_PORT, Scheme.FTPS);
Map<String, String> contextMap = new HashMap<>();
- StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+ StepVerifier.create(
+ collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
.expectNext(expectedConsumerDmaapModel).verifyComplete();
verify(ftpsClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
}
-
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java
new file mode 100644
index 00000000..3e3c2ed6
--- /dev/null
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java
@@ -0,0 +1,175 @@
+/*-
+* ============LICENSE_START=======================================================
+* Copyright (C) 2019 Nordix Foundation.
+* ================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+* SPDX-License-Identifier: Apache-2.0
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
+import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.springframework.web.util.DefaultUriBuilderFactory;
+import org.springframework.web.util.UriBuilder;
+
+/**
+ * @author <a href="mailto:maxime.bonneau@est.tech">Maxime Bonneau</a>
+ *
+ */
+public class PublishedCheckerTest {
+ private static final String EMPTY_CONTENT = "[]";
+ private static final String FEEDLOG_TOPIC = "feedlog";
+ private static final String FEED_ID = "1";
+ private static final String HTTPS_SCHEME = "https";
+ private static final String HOST = "54.45.33.2";
+ private static final int PORT = 1234;
+ private static final String SOURCE_NAME = "oteNB5309";
+ private static final String FILE_NAME = "A20161224.1030-1045.bin.gz";
+ private static final String LOCAL_FILE_NAME = SOURCE_NAME + "_" + FILE_NAME;
+
+ private static final Map<String, String> CONTEXT_MAP = new HashMap<>();
+
+ private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
+ private static AppConfig appConfigMock;
+ private DmaapProducerReactiveHttpClient httpClientMock = mock(DmaapProducerReactiveHttpClient.class);
+
+ private PublishedChecker publishedCheckerUnderTestSpy;
+
+ /**
+ * Sets up data for the tests.
+ */
+ @BeforeAll
+ public static void setUp() {
+ when(publisherConfigurationMock.dmaapHostName()).thenReturn(HOST);
+ when(publisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME);
+ when(publisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT);
+
+ appConfigMock = mock(AppConfig.class);
+ when(appConfigMock.getDmaapPublisherConfiguration()).thenReturn(publisherConfigurationMock);
+ }
+
+ @Test
+ public void executeWhenNotPublished_returnsFalse() throws Exception {
+ prepareMocksForTests(HttpUtils.SC_OK, EMPTY_CONTENT, null);
+
+ boolean isPublished = publishedCheckerUnderTestSpy.execute(LOCAL_FILE_NAME, CONTEXT_MAP);
+
+ assertFalse(isPublished);
+
+ ArgumentCaptor<HttpUriRequest> requestCaptor = ArgumentCaptor.forClass(HttpUriRequest.class);
+ verify(httpClientMock).getBaseUri();
+ verify(httpClientMock).addUserCredentialsToHead(any(HttpUriRequest.class));
+ verify(httpClientMock).getDmaapProducerResponseWithCustomTimeout(requestCaptor.capture(), anyInt(), any());
+ verifyNoMoreInteractions(httpClientMock);
+
+ HttpUriRequest getRequest = requestCaptor.getValue();
+ assertTrue(getRequest instanceof HttpGet);
+ URI actualUri = getRequest.getURI();
+ assertEquals(HTTPS_SCHEME, actualUri.getScheme());
+ assertEquals(HOST, actualUri.getHost());
+ assertEquals(PORT, actualUri.getPort());
+ Path actualPath = Paths.get(actualUri.getPath());
+ assertTrue(FEEDLOG_TOPIC.equals(actualPath.getName(0).toString()));
+ assertTrue(FEED_ID.equals(actualPath.getName(1).toString()));
+ String actualQuery = actualUri.getQuery();
+ assertTrue(actualQuery.contains("type=pub"));
+ assertTrue(actualQuery.contains("filename=" + LOCAL_FILE_NAME));
+ }
+
+ @Test
+ public void executeWhenDataRouterReturnsNok_returnsFalse() throws Exception {
+ prepareMocksForTests(HttpUtils.SC_BAD_REQUEST, EMPTY_CONTENT, null);
+
+ boolean isPublished = publishedCheckerUnderTestSpy.execute(LOCAL_FILE_NAME, CONTEXT_MAP);
+
+ assertFalse(isPublished);
+ }
+
+ @Test
+ public void executeWhenPublished_returnsTrue() throws Exception {
+ prepareMocksForTests(HttpUtils.SC_OK, "[" + LOCAL_FILE_NAME + "]", null);
+
+ boolean isPublished = publishedCheckerUnderTestSpy.execute(LOCAL_FILE_NAME, CONTEXT_MAP);
+
+ assertTrue(isPublished);
+ }
+
+ @Test
+ public void executeWhenErrorInDataRouter_returnsFalse() throws Exception {
+ prepareMocksForTests(HttpUtils.SC_OK, EMPTY_CONTENT, new DatafileTaskException(""));
+
+ boolean isPublished = publishedCheckerUnderTestSpy.execute(LOCAL_FILE_NAME, CONTEXT_MAP);
+
+ assertFalse(isPublished);
+ }
+
+ final void prepareMocksForTests(int responseCode, String content, Exception exception) throws Exception {
+ publishedCheckerUnderTestSpy = spy(new PublishedChecker(appConfigMock));
+
+ doReturn(publisherConfigurationMock).when(publishedCheckerUnderTestSpy).resolveConfiguration();
+ doReturn(httpClientMock).when(publishedCheckerUnderTestSpy).resolveClient();
+
+ UriBuilder uriBuilder = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT);
+ when(httpClientMock.getBaseUri()).thenReturn(uriBuilder);
+
+ HttpResponse httpResponseMock = mock(HttpResponse.class);
+ if (exception == null) {
+ when(httpClientMock.getDmaapProducerResponseWithCustomTimeout(any(HttpUriRequest.class), anyInt(), any()))
+ .thenReturn(httpResponseMock);
+ } else {
+ when(httpClientMock.getDmaapProducerResponseWithCustomTimeout(any(HttpUriRequest.class), anyInt(), any()))
+ .thenThrow(exception);
+ }
+ HttpEntity httpEntityMock = mock(HttpEntity.class);
+ StatusLine statusLineMock = mock(StatusLine.class);
+ when(httpResponseMock.getStatusLine()).thenReturn(statusLineMock);
+ when(statusLineMock.getStatusCode()).thenReturn(responseCode);
+ when(httpResponseMock.getEntity()).thenReturn(httpEntityMock);
+ InputStream stream = new ByteArrayInputStream(content.getBytes());
+ when(httpEntityMock.getContent()).thenReturn(stream);
+ }
+}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
index 8c4b3891..d781cea3 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
@@ -19,6 +19,7 @@ package org.onap.dcaegen2.collectors.datafile.tasks;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.notNull;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -26,9 +27,12 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import java.nio.file.Paths;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
+
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -43,6 +47,7 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@@ -56,6 +61,7 @@ public class ScheduledTasksTest {
private int uniqueValue = 0;
private DMaaPMessageConsumerTask consumerMock;
+ private PublishedChecker publishedCheckerMock;
private FileCollector fileCollectorMock;
private DataRouterPublisher dataRouterMock;
@@ -75,13 +81,15 @@ public class ScheduledTasksTest {
.keyStorePasswordPath("keyStorePasswordPath") //
.enableDmaapCertAuth(true) //
.build(); //
+ doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
consumerMock = mock(DMaaPMessageConsumerTask.class);
+ publishedCheckerMock = mock(PublishedChecker.class);
fileCollectorMock = mock(FileCollector.class);
dataRouterMock = mock(DataRouterPublisher.class);
- doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
doReturn(consumerMock).when(testedObject).createConsumerTask();
+ doReturn(publishedCheckerMock).when(testedObject).createPublishedChecker();
doReturn(fileCollectorMock).when(testedObject).createFileCollector();
doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
}
@@ -146,7 +154,7 @@ public class ScheduledTasksTest {
.timeZoneOffset("") //
.name("") //
.location("") //
- .internalLocation("internalLocation") //
+ .internalLocation(Paths.get("internalLocation")) //
.compression("") //
.fileFormatType("") //
.fileFormatVersion("") //
@@ -174,6 +182,8 @@ public class ScheduledTasksTest {
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
doReturn(fileReadyMessages).when(consumerMock).execute();
+ doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
+
Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
@@ -197,6 +207,8 @@ public class ScheduledTasksTest {
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
doReturn(fileReadyMessages).when(consumerMock).execute();
+ doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
+
Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
Mono<Object> error = Mono.error(new Exception("problem"));
@@ -228,6 +240,8 @@ public class ScheduledTasksTest {
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
doReturn(fileReadyMessages).when(consumerMock).execute();
+ doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
+
Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
@@ -260,6 +274,8 @@ public class ScheduledTasksTest {
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
doReturn(fileReadyMessages).when(consumerMock).execute();
+ doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
+
Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
index 0eaa7a17..5e08efc7 100644
--- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
@@ -30,6 +30,6 @@ public class DatafileTaskException extends Exception {
}
public DatafileTaskException(String message, Exception e) {
- super(message + e);
+ super(message, e);
}
}
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
index 9f3a3188..f115dba7 100644
--- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
@@ -20,15 +20,44 @@ package org.onap.dcaegen2.collectors.datafile.model;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+import java.lang.reflect.Type;
+import java.nio.file.Path;
+/**
+ * Helper class to serialize object.
+ */
public class CommonFunctions {
- private static Gson gson = new GsonBuilder().serializeNulls().create();
+ private static Gson gson =
+ new GsonBuilder().registerTypeHierarchyAdapter(Path.class, new PathConverter()).serializeNulls().create();
- private CommonFunctions() {}
+ private CommonFunctions() {
+ }
+ /**
+ * Serializes a <code>ConsumerDmaapModel</code>.
+ *
+ * @param consumerDmaapModel model to serialize.
+ *
+ * @return a string with the serialized model.
+ */
public static String createJsonBody(ConsumerDmaapModel consumerDmaapModel) {
return gson.toJson(consumerDmaapModel);
}
-} \ No newline at end of file
+
+ /**
+ * Json serializer that handles Path serializations, since <code>Path</code> does not implement the
+ * <code>Serializable</code> interface.
+ */
+ public static class PathConverter implements JsonSerializer<Path> {
+ @Override
+ public JsonElement serialize(Path path, Type type, JsonSerializationContext jsonSerializationContext) {
+ return new JsonPrimitive(path.toString());
+ }
+ }
+}
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java
index 972316bf..2337485a 100644
--- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java
@@ -17,6 +17,9 @@
package org.onap.dcaegen2.collectors.datafile.model;
import com.google.gson.annotations.SerializedName;
+
+import java.nio.file.Path;
+
import org.immutables.gson.Gson;
import org.immutables.value.Value;
import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel;
@@ -55,7 +58,7 @@ public interface ConsumerDmaapModel extends DmaapModel {
String getLocation();
@SerializedName("internalLocation")
- String getInternalLocation();
+ Path getInternalLocation();
@SerializedName("compression")
String getCompression();
diff --git a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
index cbc3e122..25f0dbfc 100644
--- a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
+++ b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
@@ -1,58 +1,51 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.collectors.datafile.model;
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
-import org.junit.jupiter.api.Test;
+import java.nio.file.Paths;
-class CommonFunctionsTest {
- // @formatter:off
- private ConsumerDmaapModel model = ImmutableConsumerDmaapModel.builder()
- .productName("NrRadio")
- .vendorName("Ericsson")
- .lastEpochMicrosec("8745745764578")
- .sourceName("oteNB5309")
- .startEpochMicrosec("8745745764578")
- .timeZoneOffset("UTC+05:00")
- .name("A20161224.1030-1045.bin.gz")
- .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1145.bin.gz")
- .internalLocation("target/A20161224.1030-1045.bin.gz")
- .compression("gzip")
- .fileFormatType("org.3GPP.32.435#measCollec")
- .fileFormatVersion("V10")
- .build();
+import org.junit.jupiter.api.Test;
- private static final String EXPECTED_RESULT =
- "{\"productName\":\"NrRadio\","
- + "\"vendorName\":\"Ericsson\","
- + "\"lastEpochMicrosec\":\"8745745764578\","
- + "\"sourceName\":\"oteNB5309\","
- + "\"startEpochMicrosec\":\"8745745764578\","
- + "\"timeZoneOffset\":\"UTC+05:00\","
- + "\"name\":\"A20161224.1030-1045.bin.gz\","
- + "\"location\":\"ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1145.bin.gz\","
- + "\"internalLocation\":\"target/A20161224.1030-1045.bin.gz\","
- + "\"compression\":\"gzip\","
- + "\"fileFormatType\":\"org.3GPP.32.435#measCollec\","
- + "\"fileFormatVersion\":\"V10\"}";
- // @formatter:on
+public class CommonFunctionsTest {
@Test
- void createJsonBody_shouldReturnJsonInString() {
- assertEquals(EXPECTED_RESULT, CommonFunctions.createJsonBody(model));
+ public void createJsonBody_success() {
+ ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel //
+ .builder() //
+ .productName("") //
+ .vendorName("") //
+ .lastEpochMicrosec("") //
+ .sourceName("") //
+ .startEpochMicrosec("") //
+ .timeZoneOffset("") //
+ .name("") //
+ .location("") //
+ .internalLocation(Paths.get("internalLocation")) //
+ .compression("") //
+ .fileFormatType("") //
+ .fileFormatVersion("") //
+ .build();
+ String actualBody = CommonFunctions.createJsonBody(consumerDmaapModel);
+
+ assertTrue(actualBody.contains("\"internalLocation\":\"internalLocation\""));
}
-} \ No newline at end of file
+}
diff --git a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java
index 2c5e701d..0c1ac436 100644
--- a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java
+++ b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java
@@ -16,6 +16,9 @@
package org.onap.dcaegen2.collectors.datafile.model;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -28,29 +31,27 @@ public class ConsumerDmaapModelTest {
private static final String TIME_ZONE_OFFSET = "UTC+05:00";
private static final String NAME = "A20161224.1030-1045.bin.gz";
private static final String LOCATION = "ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1145.bin.gz";
- private static final String INTERNAL_LOCATION = "target/A20161224.1030-1045.bin.gz";
+ private static final Path INTERNAL_LOCATION = Paths.get("target/A20161224.1030-1045.bin.gz");
private static final String COMPRESSION = "gzip";
private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
private static final String FILE_FORMAT_VERSION = "V10";
@Test
public void consumerDmaapModelBuilder_shouldBuildAnObject() {
- // @formatter:off
- ConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .name(NAME)
- .location(LOCATION)
- .internalLocation(INTERNAL_LOCATION)
- .compression(COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ ConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .name(NAME) //
+ .location(LOCATION) //
+ .internalLocation(INTERNAL_LOCATION) //
+ .compression(COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- // @formatter:on
Assertions.assertNotNull(consumerDmaapModel);
Assertions.assertEquals(PRODUCT_NAME, consumerDmaapModel.getProductName());
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
index 1bf3ec5a..4283debf 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
@@ -67,7 +67,7 @@ public class FtpsClient implements FileCollectClient {
} catch (DatafileTaskException e) {
throw e;
} catch (Exception e) {
- throw new DatafileTaskException("Could not open connection: ", e);
+ throw new DatafileTaskException("Could not open connection: " + e, e);
}
}
@@ -100,7 +100,7 @@ public class FtpsClient implements FileCollectClient {
throw new DatafileTaskException("Could not retrieve file " + remoteFileName);
}
} catch (IOException e) {
- throw new DatafileTaskException("Could not fetch file: ", e);
+ throw new DatafileTaskException("Could not fetch file: " + e, e);
}
logger.trace("collectFile fetched: {}", localFileName);
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
index 2f489166..dec8af42 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
@@ -79,7 +79,7 @@ public class SftpClient implements FileCollectClient {
sftpChannel = getChannel(session);
}
} catch (JSchException e) {
- throw new DatafileTaskException("Could not open Sftp client", e);
+ throw new DatafileTaskException("Could not open Sftp client" + e, e);
}
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java
new file mode 100644
index 00000000..e01a941b
--- /dev/null
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java
@@ -0,0 +1,67 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.http;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+
+import org.apache.http.client.RedirectStrategy;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+
+/**
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+public class HttpAsyncClientBuilderWrapper implements IHttpAsyncClientBuilder {
+ HttpAsyncClientBuilder builder = HttpAsyncClients.custom();
+
+ @Override
+ public IHttpAsyncClientBuilder setRedirectStrategy(RedirectStrategy redirectStrategy) {
+ builder.setRedirectStrategy(redirectStrategy);
+ return this;
+ }
+
+ @Override
+ public IHttpAsyncClientBuilder setSSLContext(SSLContext sslcontext) {
+ builder.setSSLContext(sslcontext);
+ return this;
+ }
+
+ @Override
+ public IHttpAsyncClientBuilder setSSLHostnameVerifier(HostnameVerifier hostnameVerifier) {
+ builder.setSSLHostnameVerifier(hostnameVerifier);
+ return this;
+ }
+
+ @Override
+ public IHttpAsyncClientBuilder setDefaultRequestConfig(RequestConfig config) {
+ builder.setDefaultRequestConfig(config);
+ return this;
+ }
+
+ @Override
+ public CloseableHttpAsyncClient build() {
+ return builder.build();
+ }
+
+}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/IHttpAsyncClientBuilder.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/IHttpAsyncClientBuilder.java
new file mode 100644
index 00000000..e0a51a80
--- /dev/null
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/IHttpAsyncClientBuilder.java
@@ -0,0 +1,43 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.http;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+
+import org.apache.http.client.RedirectStrategy;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+
+/**
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+public interface IHttpAsyncClientBuilder {
+ public IHttpAsyncClientBuilder setRedirectStrategy(final RedirectStrategy redirectStrategy);
+
+ public IHttpAsyncClientBuilder setSSLContext(final SSLContext sslcontext);
+
+ public IHttpAsyncClientBuilder setSSLHostnameVerifier(final HostnameVerifier hostnameVerifier);
+
+ public IHttpAsyncClientBuilder setDefaultRequestConfig(final RequestConfig config);
+
+ public CloseableHttpAsyncClient build();
+}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java
deleted file mode 100644
index 5295b124..00000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.io;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Path;
-import org.springframework.core.io.FileSystemResource;
-
-public class FileSystemResourceWrapper implements IFileSystemResource {
- private FileSystemResource realResource;
-
- @Override
- public void setPath(Path path) {
- realResource = new FileSystemResource(path);
- }
- @Override
- public InputStream getInputStream() throws IOException {
- return realResource.getInputStream();
- }
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java
deleted file mode 100644
index 23f14a33..00000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.io;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Path;
-
-public interface IFileSystemResource {
-
- public void setPath(Path filePath);
-
- public InputStream getInputStream() throws IOException;
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
index 9304688f..9bd5d57f 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
@@ -16,54 +16,34 @@
package org.onap.dcaegen2.collectors.datafile.service.producer;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
-
-import com.google.gson.JsonElement;
-import com.google.gson.JsonParser;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
import java.nio.charset.StandardCharsets;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.Future;
import javax.net.ssl.SSLContext;
import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse;
-import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
-import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
-import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.ssl.SSLContextBuilder;
-import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper;
-import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
-import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.http.HttpAsyncClientBuilderWrapper;
+import org.onap.dcaegen2.collectors.datafile.http.IHttpAsyncClientBuilder;
import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpStatus;
import org.springframework.web.util.DefaultUriBuilderFactory;
-
-import reactor.core.publisher.Mono;
+import org.springframework.web.util.UriBuilder;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
@@ -71,11 +51,7 @@ import reactor.core.publisher.Mono;
*/
public class DmaapProducerReactiveHttpClient {
- private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
- private static final String NAME_JSON_TAG = "name";
- private static final String INTERNAL_LOCATION_JSON_TAG = "internalLocation";
- private static final String URI_SEPARATOR = "/";
- private static final String DEFAULT_FEED_ID = "1";
+ private static final int NO_REQUEST_TIMEOUT = -1;
private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
private static final Marker INVOKE_RETURN = MarkerFactory.getMarker("INVOKE_RETURN");
@@ -83,14 +59,10 @@ public class DmaapProducerReactiveHttpClient {
private final String dmaapHostName;
private final Integer dmaapPortNumber;
- private final String dmaapTopicName;
private final String dmaapProtocol;
- private final String dmaapContentType;
private final String user;
private final String pwd;
- private IFileSystemResource fileResource = new FileSystemResourceWrapper();
-
/**
* Constructor DmaapProducerReactiveHttpClient.
*
@@ -99,96 +71,86 @@ public class DmaapProducerReactiveHttpClient {
public DmaapProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName();
this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber();
- this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName();
this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
- this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType();
this.user = dmaapPublisherConfiguration.dmaapUserName();
this.pwd = dmaapPublisherConfiguration.dmaapUserPassword();
}
- /**
- * Function for calling DMaaP HTTP producer - post request to DMaaP DataRouter.
- *
- * @param consumerDmaapModel - object which will be sent to DMaaP DataRouter
- * @return status code of operation
- */
- public Mono<HttpStatus> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel,
- Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
- try (CloseableHttpAsyncClient webClient = createWebClient()) {
-
- HttpPut put = new HttpPut();
- prepareHead(consumerDmaapModel, put);
- prepareBody(consumerDmaapModel, put);
- addUserCredentialsToHead(put);
-
- logger.trace(INVOKE, "Starting to publish to DR {}", consumerDmaapModel.getInternalLocation());
- Future<HttpResponse> future = webClient.execute(put, null);
+ public HttpResponse getDmaapProducerResponseWithRedirect(HttpUriRequest request, Map<String, String> contextMap)
+ throws DatafileTaskException {
+ try (CloseableHttpAsyncClient webClient = createWebClient(true, NO_REQUEST_TIMEOUT)) {
+ MdcVariables.setMdcContextMap(contextMap);
+ webClient.start();
+
+ logger.trace(INVOKE, "Starting to produce to DR {}", request);
+ Future<HttpResponse> future = webClient.execute(request, null);
HttpResponse response = future.get();
- logger.trace(INVOKE_RETURN, "Response from DR {}", response);
- return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
+ logger.trace(INVOKE_RETURN, "Response from DR {}", response.toString());
+ return response;
} catch (Exception e) {
- logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e);
- return Mono.error(e);
+ throw new DatafileTaskException("Unable to create web client.", e);
}
}
- private void addUserCredentialsToHead(HttpPut put) {
+ public HttpResponse getDmaapProducerResponseWithCustomTimeout(HttpUriRequest request, int requestTimeout,
+ Map<String, String> contextMap) throws DatafileTaskException {
+ try (CloseableHttpAsyncClient webClient = createWebClient(false, requestTimeout)) {
+ MdcVariables.setMdcContextMap(contextMap);
+ webClient.start();
+
+ logger.trace(INVOKE, "Starting to produce to DR {}", request);
+ Future<HttpResponse> future = webClient.execute(request, null);
+ HttpResponse response = future.get();
+ logger.trace(INVOKE_RETURN, "Response from DR {}", response.toString());
+ return response;
+ } catch (Exception e) {
+ throw new DatafileTaskException("Unable to create web client.", e);
+ }
+ }
+
+ public void addUserCredentialsToHead(HttpUriRequest request) {
String plainCreds = user + ":" + pwd;
byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes);
String base64Creds = new String(base64CredsBytes);
logger.trace("base64Creds...: {}", base64Creds);
- put.addHeader("Authorization", "Basic " + base64Creds);
+ request.addHeader("Authorization", "Basic " + base64Creds);
}
- private void prepareHead(ConsumerDmaapModel model, HttpPut put) {
- put.addHeader(HttpHeaders.CONTENT_TYPE, dmaapContentType);
- JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
- String name = metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
- metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
- put.addHeader(X_DMAAP_DR_META, metaData.toString());
- put.setURI(getUri(name));
-
- String requestID = MDC.get(REQUEST_ID);
- put.addHeader(X_ONAP_REQUEST_ID, requestID);
- String invocationID = UUID.randomUUID().toString();
- put.addHeader(X_INVOCATION_ID, invocationID);
+ public UriBuilder getBaseUri() {
+ return new DefaultUriBuilderFactory().builder() //
+ .scheme(dmaapProtocol) //
+ .host(dmaapHostName) //
+ .port(dmaapPortNumber);
}
- private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException {
- Path fileLocation = Paths.get(model.getInternalLocation());
- this.fileResource.setPath(fileLocation);
- InputStream fileInputStream = fileResource.getInputStream();
-
- put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
+ private CloseableHttpAsyncClient createWebClient(boolean expectRedirect, int requestTimeout)
+ throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
+ SSLContext sslContext =
+ new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build();
- }
+ IHttpAsyncClientBuilder clientBuilder = getHttpClientBuilder();
+ clientBuilder.setSSLContext(sslContext) //
+ .setSSLHostnameVerifier(new NoopHostnameVerifier());
- private URI getUri(String fileName) {
- String path = dmaapTopicName + URI_SEPARATOR + DEFAULT_FEED_ID + URI_SEPARATOR + fileName;
- return new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber)
- .path(path).build();
- }
+ if (expectRedirect) {
+ clientBuilder.setRedirectStrategy(PublishRedirectStrategy.INSTANCE);
+ }
- void setFileSystemResource(IFileSystemResource fileSystemResource) {
- fileResource = fileSystemResource;
- }
+ if (requestTimeout > NO_REQUEST_TIMEOUT) {
+ RequestConfig requestConfig = RequestConfig.custom() //
+ .setSocketTimeout(requestTimeout) //
+ .setConnectTimeout(requestTimeout) //
+ .setConnectionRequestTimeout(requestTimeout) //
+ .build();
- protected CloseableHttpAsyncClient createWebClient()
- throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
+ clientBuilder.setDefaultRequestConfig(requestConfig);
+ }
- SSLContext sslContext = new SSLContextBuilder() //
- .loadTrustMaterial(null, (certificate, authType) -> true) //
- .build();
-
- CloseableHttpAsyncClient webClient = HttpAsyncClients.custom() //
- .setSSLContext(sslContext) //
- .setSSLHostnameVerifier(new NoopHostnameVerifier()) //
- .setRedirectStrategy(PublishRedirectStrategy.INSTANCE) //
- .build();
- webClient.start();
- return webClient;
+ return clientBuilder.build();
}
+ IHttpAsyncClientBuilder getHttpClientBuilder() {
+ return new HttpAsyncClientBuilderWrapper();
+ }
} \ No newline at end of file
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
index 06ff570c..91c4c334 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
@@ -16,51 +16,44 @@
package org.onap.dcaegen2.collectors.datafile.service.producer;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonParser;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
-import java.nio.file.Paths;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import javax.net.ssl.SSLContext;
+
import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.io.IOUtils;
+import org.apache.http.Header;
import org.apache.http.HttpResponse;
-import org.apache.http.StatusLine;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPut;
-import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
-import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
+import org.mockito.ArgumentCaptor;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.http.IHttpAsyncClientBuilder;
+import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpStatus;
-import org.springframework.web.util.DefaultUriBuilderFactory;
-
-import reactor.test.StepVerifier;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
@@ -68,32 +61,24 @@ import reactor.test.StepVerifier;
*/
class DmaapProducerReactiveHttpClientTest {
- private static final String FILE_NAME = "A20161224.1030-1045.bin.gz";
- private static final String INTERNAL_LOCATION_JSON_TAG = "internalLocation";
- private static final String NAME_JSON_TAG = "name";
- private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
-
private static final String HOST = "54.45.33.2";
private static final String HTTPS_SCHEME = "https";
private static final int PORT = 1234;
- private static final String APPLICATION_OCTET_STREAM_CONTENT_TYPE = "application/octet-stream";
- private static final String URI_SEPARATOR = "/";
- private static final String PUBLISH_TOPIC = "publish";
- private static final String DEFAULT_FEED_ID = "1";
- private static final String FILE_CONTENT = "Just a string.";
+ private static final String USER_NAME = "dradmin";
+ private static final int TWO_SECOND_TIMEOUT = 2000;
- private DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient;
+ private static final Map<String, String> CONTEXT_MAP = new HashMap<>();
+
+
+ private DmaapProducerReactiveHttpClient producerClientUnderTestSpy;
private DmaapPublisherConfiguration dmaapPublisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
- private ConsumerDmaapModel consumerDmaapModel;
- private IFileSystemResource fileSystemResourceMock = mock(IFileSystemResource.class);
+ private IHttpAsyncClientBuilder clientBuilderMock;
+
private CloseableHttpAsyncClient clientMock;
- private HttpResponse responseMock = mock(HttpResponse.class);
@SuppressWarnings("unchecked")
private Future<HttpResponse> futureMock = mock(Future.class);
- private StatusLine statusLine = mock(StatusLine.class);
- private InputStream fileStream;
@BeforeEach
void setUp() throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
@@ -102,82 +87,114 @@ class DmaapProducerReactiveHttpClientTest {
when(dmaapPublisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT);
when(dmaapPublisherConfigurationMock.dmaapUserName()).thenReturn("dradmin");
when(dmaapPublisherConfigurationMock.dmaapUserPassword()).thenReturn("dradmin");
- when(dmaapPublisherConfigurationMock.dmaapContentType()).thenReturn(APPLICATION_OCTET_STREAM_CONTENT_TYPE);
- when(dmaapPublisherConfigurationMock.dmaapTopicName()).thenReturn(PUBLISH_TOPIC);
-
- // @formatter:off
- consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
- .productName("NrRadio")
- .vendorName("Ericsson")
- .lastEpochMicrosec("8745745764578")
- .sourceName("oteNB5309")
- .startEpochMicrosec("8745745764578")
- .timeZoneOffset("UTC+05:00")
- .name("A20161224.1030-1045.bin.gz")
- .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1145.bin.gz")
- .internalLocation("target/A20161224.1030-1045.bin.gz")
- .compression("gzip")
- .fileFormatType("org.3GPP.32.435#measCollec")
- .fileFormatVersion("V10")
- .build();
- //formatter:on
-
- dmaapProducerReactiveHttpClient = spy(new DmaapProducerReactiveHttpClient(dmaapPublisherConfigurationMock));
- dmaapProducerReactiveHttpClient.setFileSystemResource(fileSystemResourceMock);
+
+ producerClientUnderTestSpy = spy(new DmaapProducerReactiveHttpClient(dmaapPublisherConfigurationMock));
+
+ clientBuilderMock = mock(IHttpAsyncClientBuilder.class);
clientMock = mock(CloseableHttpAsyncClient.class);
- doReturn(clientMock).when(dmaapProducerReactiveHttpClient).createWebClient();
}
@Test
- void getHttpResponse_Success() throws Exception {
- mockWebClientDependantObject();
+ void getHttpResponseWithRederict_Success() throws Exception {
+ doReturn(clientBuilderMock).when(producerClientUnderTestSpy).getHttpClientBuilder();
+ when(clientBuilderMock.setSSLContext(any(SSLContext.class))).thenReturn(clientBuilderMock);
+ when(clientBuilderMock.setSSLHostnameVerifier(any(NoopHostnameVerifier.class))).thenReturn(clientBuilderMock);
+ when(clientBuilderMock.build()).thenReturn(clientMock);
+ when(clientMock.execute(any(HttpUriRequest.class), any())).thenReturn(futureMock);
+ HttpResponse responseMock = mock(HttpResponse.class);
+ when(futureMock.get()).thenReturn(responseMock);
- HttpPut httpPut = new HttpPut();
- httpPut.addHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_OCTET_STREAM_CONTENT_TYPE);
+ HttpGet request = new HttpGet();
+ producerClientUnderTestSpy.getDmaapProducerResponseWithRedirect(request, CONTEXT_MAP);
- URI expectedUri = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT)
- .path(PUBLISH_TOPIC + URI_SEPARATOR + DEFAULT_FEED_ID + URI_SEPARATOR + FILE_NAME).build();
- httpPut.setURI(expectedUri);
+ verify(clientBuilderMock).setSSLContext(any(SSLContext.class));
+ verify(clientBuilderMock).setSSLHostnameVerifier(any(NoopHostnameVerifier.class));
+ verify(clientBuilderMock).setRedirectStrategy(PublishRedirectStrategy.INSTANCE);
+ verify(clientBuilderMock).build();
+ verifyNoMoreInteractions(clientBuilderMock);
- JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(consumerDmaapModel));
- metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
- metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
- httpPut.addHeader(X_DMAAP_DR_META, metaData.toString());
+ verify(clientMock).start();
+ verify(clientMock).close();
- String plainCreds = "dradmin" + ":" + "dradmin";
- byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
- byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes);
- String base64Creds = new String(base64CredsBytes);
- httpPut.addHeader("Authorization", "Basic " + base64Creds);
+ verify(futureMock).get();
+ verifyNoMoreInteractions(futureMock);
+ }
- fileStream.reset();
- Map<String, String> contextMap = new HashMap<>();
- StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel, contextMap))
- .expectNext(HttpStatus.OK).verifyComplete();
+ @Test
+ void getHttpResponseWithCustomTimeout_Success() throws Exception {
+ doReturn(clientBuilderMock).when(producerClientUnderTestSpy).getHttpClientBuilder();
+ when(clientBuilderMock.setSSLContext(any(SSLContext.class))).thenReturn(clientBuilderMock);
+ when(clientBuilderMock.setDefaultRequestConfig(any(RequestConfig.class))).thenReturn(clientBuilderMock);
+ when(clientBuilderMock.build()).thenReturn(clientMock);
+ when(clientMock.execute(any(HttpUriRequest.class), any())).thenReturn(futureMock);
+ HttpResponse responseMock = mock(HttpResponse.class);
+ when(futureMock.get()).thenReturn(responseMock);
- verify(fileSystemResourceMock).setPath(Paths.get("target/" + FILE_NAME));
- InputStream fileInputStream = fileSystemResourceMock.getInputStream();
- httpPut.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
+ HttpGet request = new HttpGet();
+ producerClientUnderTestSpy.getDmaapProducerResponseWithCustomTimeout(request, TWO_SECOND_TIMEOUT, CONTEXT_MAP);
+
+ ArgumentCaptor<RequestConfig> requestConfigCaptor = ArgumentCaptor.forClass(RequestConfig.class);
+ verify(clientBuilderMock).setSSLContext(any(SSLContext.class));
+ verify(clientBuilderMock).setSSLHostnameVerifier(any(NoopHostnameVerifier.class));
+ verify(clientBuilderMock).setDefaultRequestConfig(requestConfigCaptor.capture());
+ RequestConfig requestConfig = requestConfigCaptor.getValue();
+ assertEquals(TWO_SECOND_TIMEOUT, requestConfig.getSocketTimeout());
+ assertEquals(TWO_SECOND_TIMEOUT, requestConfig.getConnectTimeout());
+ assertEquals(TWO_SECOND_TIMEOUT, requestConfig.getConnectionRequestTimeout());
+ verify(clientBuilderMock).build();
+ verifyNoMoreInteractions(clientBuilderMock);
+
+ verify(clientMock).start();
+ verify(clientMock).close();
+
+ verify(futureMock).get();
+ verifyNoMoreInteractions(futureMock);
}
@Test
- void getHttpResponse_Fail() throws Exception {
- Map<String, String> contextMap = new HashMap<>();
- doReturn(futureMock).when(clientMock).execute(any(), any());
- doThrow(new InterruptedException()).when(futureMock).get();
- StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel, contextMap)) //
- .expectError() //
- .verify(); //
+ public void getResponseWithException_throwsException() throws Exception {
+ doReturn(clientBuilderMock).when(producerClientUnderTestSpy).getHttpClientBuilder();
+ when(clientBuilderMock.setDefaultRequestConfig(any(RequestConfig.class))).thenReturn(clientBuilderMock);
+ when(clientBuilderMock.setSSLContext(any(SSLContext.class))).thenReturn(clientBuilderMock);
+ when(clientBuilderMock.build()).thenReturn(clientMock);
+ HttpPut request = new HttpPut();
+ when(clientMock.execute(any(HttpPut.class), any())).thenReturn(futureMock);
+
+ try {
+ when(futureMock.get()).thenThrow(new InterruptedException("Interrupted"));
+
+ producerClientUnderTestSpy.getDmaapProducerResponseWithCustomTimeout(request, TWO_SECOND_TIMEOUT,
+ CONTEXT_MAP);
+
+ fail("Should have got an exception.");
+ } catch (DatafileTaskException e) {
+ assertTrue(e.getCause() instanceof InterruptedException);
+ assertEquals("Interrupted", e.getCause().getMessage());
+ } catch (Exception e) {
+ fail("Wrong exception");
+ }
+
+ verify(clientMock).start();
+ verify(clientMock).close();
}
- private void mockWebClientDependantObject()
- throws IOException, InterruptedException, ExecutionException {
- fileStream = new ByteArrayInputStream(FILE_CONTENT.getBytes());
- when(fileSystemResourceMock.getInputStream()).thenReturn(fileStream);
- when(clientMock.execute(any(HttpPut.class), any())).thenReturn(futureMock);
- when(futureMock.get()).thenReturn(responseMock);
- when(responseMock.getStatusLine()).thenReturn(statusLine);
- when(statusLine.getStatusCode()).thenReturn(HttpUtils.SC_OK);
+ @Test
+ public void addCredentialsToHead_success() {
+ HttpPut request = new HttpPut();
+
+ producerClientUnderTestSpy.addUserCredentialsToHead(request);
+ String plainCreds = USER_NAME + ":" + USER_NAME;
+ byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
+ byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes);
+ String base64Creds = "Basic " + new String(base64CredsBytes);
+ Header[] authorizationHeaders = request.getHeaders("Authorization");
+ assertEquals(base64Creds, authorizationHeaders[0].getValue());
+ }
+
+ @Test
+ public void getBaseUri_success() {
+ URI uri = producerClientUnderTestSpy.getBaseUri().build();
+ assertEquals(HTTPS_SCHEME + "://" + HOST + ":" + PORT, uri.toString());
}
}