aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.gitignore4
-rw-r--r--INFO.yaml34
-rw-r--r--datafile-app-server/config/application.yaml2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java12
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java43
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java31
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java27
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java90
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java19
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java28
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java131
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java71
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java120
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java166
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java (renamed from datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java)0
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java87
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java16
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java522
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java207
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java231
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java34
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java175
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java52
-rw-r--r--datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java2
-rw-r--r--datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java35
-rw-r--r--datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java5
-rw-r--r--datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java87
-rw-r--r--datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java33
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java1
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java53
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java7
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java67
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/IHttpAsyncClientBuilder.java43
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java37
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java28
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java176
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java33
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java215
39 files changed, 1729 insertions, 1199 deletions
diff --git a/.gitignore b/.gitignore
index d03f9a95..37707c82 100644
--- a/.gitignore
+++ b/.gitignore
@@ -45,7 +45,7 @@ buildNumber.properties
.mvn/timing.properties
.mvn/wrapper/maven-wrapper.jar
-# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
-hs_err_pid*
+# CheckStyle files
+.checkstyle
opt/ \ No newline at end of file
diff --git a/INFO.yaml b/INFO.yaml
index b0430d13..f7ed9638 100644
--- a/INFO.yaml
+++ b/INFO.yaml
@@ -28,32 +28,7 @@ meetings:
repeats: 'weekly'
time: '13:00 UTC (DST), 15:30 UTC (post DST)'
repositories:
- - 'dcaegen2'
- - 'dcaegen2-analytics'
- - 'dcaegen2-analytics-flink'
- - 'dcaegen2-analytics-pnda'
- - 'dcaegen2-analytics-tca'
- - 'dcaegen2-analytics-tca-gen2'
- - 'dcaegen2-collectors'
- 'dcaegen2-collectors-datafile'
- - 'dcaegen2-collectors-hv-ves'
- - 'dcaegen2-collectors-snmptrap'
- - 'dcaegen2-collectors-ves'
- - 'dcaegen2-deployments'
- - 'dcaegen2-platform-blueprints'
- - 'dcaegen2-platform-cdapbroker'
- - 'dcaegen2-platform-cli'
- - 'dcaegen2-platform-configbinding'
- - 'dcaegen2-platform-deployment-handler'
- - 'dcaegen2-platform-inventory-api'
- - 'dcaegen2-platform-plugins'
- - 'dcaegen2-platform-policy-handler'
- - 'dcaegen2-platform-registrator'
- - 'dcaegen2-platform-servicechange-handler'
- - 'dcaegen2-services-heartbeat'
- - 'dcaegen2-services-mapper'
- - 'dcaegen2-services-pm-mapper'
- - 'dcaegen2-services-prh'
committers:
- <<: *onap_dcaegen2_ptl
- name: 'Lusheng Ji'
@@ -86,5 +61,14 @@ committers:
company: 'ATT'
id: 'jflucas'
timezone: 'America/New_York'
+ - name: 'Henrik Andersson'
+ email: 'henrik.b.andersson@est.tech'
+ company: 'EST'
+ id: 'elinuxhenrik'
+ timezone: 'Europe/Stockholm'
tsc:
approval: 'https://lists.onap.org/pipermail/onap-tsc'
+ changes:
+ - type: 'Addition'
+ name: 'Henrik Andersson'
+ link: 'https://lists.onap.org/g/onap-tsc/message/4771'
diff --git a/datafile-app-server/config/application.yaml b/datafile-app-server/config/application.yaml
index f2538578..8985b9b8 100644
--- a/datafile-app-server/config/application.yaml
+++ b/datafile-app-server/config/application.yaml
@@ -15,7 +15,7 @@ logging:
org.springframework: ERROR
org.springframework.data: ERROR
org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR
- org.onap.dcaegen2.collectors.datafile: ERROR
+ org.onap.dcaegen2.collectors.datafile: WARN
file: /var/log/ONAP/application.log
app:
filepath: /opt/app/datafile/config/datafile_endpoints.json
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index 82c390f7..a30d2826 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -42,6 +42,8 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.stereotype.Component;
/**
+ * Holds all configuration for the DFC.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
@@ -79,8 +81,10 @@ public class AppConfig {
return ftpesConfiguration;
}
+ /**
+ * Reads the configuration from file.
+ */
public void loadConfigurationFromFile() {
-
GsonBuilder gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
JsonParser parser = new JsonParser();
@@ -108,10 +112,8 @@ public class AppConfig {
setConfiguration(consumerConfiguration, publisherConfiguration, ftpesConfig);
}
- } catch (IOException e) {
- logger.error("Problem with file loading, file: {}", filepath, e);
- } catch (JsonSyntaxException e) {
- logger.error("Problem with Json deserialization", e);
+ } catch (JsonSyntaxException | IOException e) {
+ logger.error("Problem with loading configuration, file: {}", filepath, e);
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
index b4dc6353..d5c8b3b2 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -18,6 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.INVOCATION_ID;
import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
+
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
@@ -25,7 +26,9 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
+
import javax.annotation.PostConstruct;
+
import org.apache.commons.lang3.StringUtils;
import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
@@ -40,11 +43,15 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
+
import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
/**
+ * Api for starting and stopping DFC.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/13/18
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
@Configuration
@EnableScheduling
@@ -56,18 +63,25 @@ public class SchedulerConfig {
private static final Logger logger = LoggerFactory.getLogger(SchedulerConfig.class);
private static final Marker ENTRY = MarkerFactory.getMarker("ENTRY");
private static final Marker EXIT = MarkerFactory.getMarker("EXIT");
- private static volatile List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
+ private static List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
private Map<String, String> contextMap;
private final TaskScheduler taskScheduler;
private final ScheduledTasks scheduledTask;
private final CloudConfiguration cloudConfiguration;
+ /**
+ * Constructor.
+ *
+ * @param taskScheduler The scheduler used to schedule the tasks.
+ * @param scheduledTasks The scheduler that will actually handle the tasks.
+ * @param cloudConfiguration The DFC configuration.
+ */
@Autowired
- public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTask,
- CloudConfiguration cloudConfiguration) {
+ public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTasks,
+ CloudConfiguration cloudConfiguration) {
this.taskScheduler = taskScheduler;
- this.scheduledTask = scheduledTask;
+ this.scheduledTask = scheduledTasks;
this.cloudConfiguration = cloudConfiguration;
}
@@ -83,8 +97,7 @@ public class SchedulerConfig {
MdcVariables.setMdcContextMap(contextMap);
logger.info(EXIT, "Stopped Datafile workflow");
MDC.clear();
- return Mono.defer(() -> Mono
- .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED)));
+ return Mono.just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED));
}
/**
@@ -106,12 +119,14 @@ public class SchedulerConfig {
contextMap = MDC.getCopyOfContextMap();
logger.info(ENTRY, "Start scheduling Datafile workflow");
if (scheduledFutureList.isEmpty()) {
- scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(() -> cloudConfiguration.runTask(contextMap), Instant.now(),
- SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY));
- scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.scheduleMainDatafileEventTask(contextMap),
- SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
- scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()),
- SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE));
+ scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(() -> cloudConfiguration.runTask(contextMap),
+ Instant.now(), SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY));
+ scheduledFutureList.add(
+ taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.scheduleMainDatafileEventTask(contextMap),
+ SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
+ scheduledFutureList
+ .add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()),
+ SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE));
return true;
} else {
@@ -119,4 +134,4 @@ public class SchedulerConfig {
}
}
-} \ No newline at end of file
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java
deleted file mode 100644
index 36279016..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.exceptions;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
- */
-public class DmaapNotFoundException extends DatafileTaskException {
-
- private static final long serialVersionUID = 1L;
-
- public DmaapNotFoundException(String message) {
- super(message);
- }
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
index bdb47b2b..d0d1f91a 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
@@ -37,10 +37,19 @@ import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
public abstract class FileData {
private static final String DATAFILE_TMPDIR = "/tmp/onap_datafile/";
+ /**
+ * @return the file name with no path
+ */
public abstract String name();
+ /**
+ * @return the URL to use to fetch the file from the PNF
+ */
public abstract String location();
+ /**
+ * @return the file transfer protocol to use for fetching the file
+ */
public abstract Scheme scheme();
public abstract String compression();
@@ -49,17 +58,25 @@ public abstract class FileData {
public abstract String fileFormatVersion();
+ public abstract MessageMetaData messageMetaData();
+
+ /**
+ * @return the name of the PNF, must be unique in the network
+ */
+ public String sourceName() {
+ return messageMetaData().sourceName();
+ }
+
public String remoteFilePath() {
return URI.create(location()).getPath();
}
public Path getLocalFileName() {
- URI uri = URI.create(location());
- return createLocalFileName(uri.getHost(), name());
+ return createLocalFileName(messageMetaData().sourceName(), name());
}
- public static Path createLocalFileName(String host, String fileName) {
- return Paths.get(DATAFILE_TMPDIR, host + "_" + fileName);
+ public static Path createLocalFileName(String sourceName, String fileName) {
+ return Paths.get(DATAFILE_TMPDIR, sourceName + "_" + fileName);
}
public FileServerData fileServerData() {
@@ -86,4 +103,4 @@ public abstract class FileData {
}
return Optional.empty();
}
-} \ No newline at end of file
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
index e3293faa..9373a4f2 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
@@ -31,9 +31,5 @@ import org.immutables.value.Value;
@Value.Immutable
@Gson.TypeAdapters
public interface FileReadyMessage {
- public String pnfName();
-
- public MessageMetaData messageMetaData();
-
public List<FileData> files();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index a8f79ea1..a3595ecf 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -88,15 +88,25 @@ public class JsonMessageParser {
}
}
+ /**
+ * Parses the Json message and returns a stream of messages.
+ *
+ * @param rawMessage the Json message to parse.
+ * @return a <code>Flux</code> containing messages.
+ */
public Flux<FileReadyMessage> getMessagesFromJson(Mono<String> rawMessage) {
return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createMessageData);
}
- public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
+ Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
JsonParser jsonParser = new JsonParser();
- return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
- : element.isJsonObject() ? Optional.of((JsonObject) element)
- : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
+ if (element.isJsonPrimitive()) {
+ return Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject());
+ } else if (element.isJsonObject()) {
+ return Optional.of((JsonObject) element);
+ } else {
+ return Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
+ }
}
private Flux<FileReadyMessage> getMessagesFromJsonArray(JsonElement jsonElement) {
@@ -127,34 +137,32 @@ public class JsonMessageParser {
: logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject));
}
+
private Mono<FileReadyMessage> transformMessages(JsonObject message) {
Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message);
if (optionalMessageMetaData.isPresent()) {
+ MessageMetaData messageMetaData = optionalMessageMetaData.get();
JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
if (arrayOfNamedHashMap != null) {
- List<FileData> allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap);
+ List<FileData> allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap, messageMetaData);
if (!allFileDataFromJson.isEmpty()) {
- MessageMetaData messageMetaData = optionalMessageMetaData.get();
- // @formatter:off
- return Mono.just(ImmutableFileReadyMessage.builder()
- .pnfName(messageMetaData.sourceName())
- .messageMetaData(messageMetaData)
- .files(allFileDataFromJson)
+ return Mono.just(ImmutableFileReadyMessage.builder() //
+ .files(allFileDataFromJson) //
.build());
- // @formatter:on
} else {
return Mono.empty();
}
}
- logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message);
+ logger.error("VES event parsing. Missing arrayOfNamedHashMap in message. {}", message);
return Mono.empty();
}
- logger.error("Unable to collect file from xNF. FileReady event has incorrect JsonObject. {}", message);
+ logger.error("VES event parsing. FileReady event has incorrect JsonObject. {}", message);
return Mono.empty();
}
+
private Optional<MessageMetaData> getMessageMetaData(JsonObject message) {
List<String> missingValues = new ArrayList<>();
JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER);
@@ -168,22 +176,20 @@ public class JsonMessageParser {
// version.
getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION, missingValues);
- // @formatter:off
- MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
- .productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues))
- .vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues))
- .lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues))
- .sourceName(getValueFromJson(commonEventHeader, SOURCE_NAME, missingValues))
- .startEpochMicrosec(getValueFromJson(commonEventHeader, START_EPOCH_MICROSEC, missingValues))
- .timeZoneOffset(getValueFromJson(commonEventHeader, TIME_ZONE_OFFSET, missingValues))
- .changeIdentifier(changeIdentifier)
- .changeType(changeType)
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
+ .productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues)) //
+ .vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues)) //
+ .lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues)) //
+ .sourceName(getValueFromJson(commonEventHeader, SOURCE_NAME, missingValues)) //
+ .startEpochMicrosec(getValueFromJson(commonEventHeader, START_EPOCH_MICROSEC, missingValues)) //
+ .timeZoneOffset(getValueFromJson(commonEventHeader, TIME_ZONE_OFFSET, missingValues)) //
+ .changeIdentifier(changeIdentifier) //
+ .changeType(changeType) //
.build();
- // @formatter:on
if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) {
return Optional.of(messageMetaData);
} else {
- String errorMessage = "Unable to collect file from xNF.";
+ String errorMessage = "VES event parsing.";
if (!missingValues.isEmpty()) {
errorMessage += " Missing data: " + missingValues;
}
@@ -204,11 +210,11 @@ public class JsonMessageParser {
return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier);
}
- private List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields) {
+ private List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields, MessageMetaData messageMetaData) {
List<FileData> res = new ArrayList<>();
for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
- Optional<FileData> fileData = getFileDataFromJson(fileInfo);
+ Optional<FileData> fileData = getFileDataFromJson(fileInfo, messageMetaData);
if (fileData.isPresent()) {
res.add(fileData.get());
@@ -217,7 +223,7 @@ public class JsonMessageParser {
return res;
}
- private Optional<FileData> getFileDataFromJson(JsonObject fileInfo) {
+ private Optional<FileData> getFileDataFromJson(JsonObject fileInfo, MessageMetaData messageMetaData) {
logger.trace("starting to getFileDataFromJson!");
List<String> missingValues = new ArrayList<>();
@@ -228,30 +234,28 @@ public class JsonMessageParser {
try {
scheme = Scheme.getSchemeFromString(URI.create(location).getScheme());
} catch (Exception e) {
- logger.error("Unable to collect file from xNF.", e);
+ logger.error("VES event parsing.", e);
return Optional.empty();
}
- // @formatter:off
- FileData fileData = ImmutableFileData.builder()
- .name(getValueFromJson(fileInfo, NAME, missingValues))
- .fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues))
- .fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues))
- .location(location)
- .scheme(scheme)
- .compression(getValueFromJson(data, COMPRESSION, missingValues))
+ FileData fileData = ImmutableFileData.builder() //
+ .name(getValueFromJson(fileInfo, NAME, missingValues)) //
+ .fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues)) //
+ .fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues)) //
+ .location(location) //
+ .scheme(scheme) //
+ .compression(getValueFromJson(data, COMPRESSION, missingValues)) //
+ .messageMetaData(messageMetaData)
.build();
- // @formatter:on
if (missingValues.isEmpty()) {
return Optional.of(fileData);
}
- logger.error("Unable to collect file from xNF. File information wrong. Missing data: {} Data: {}",
- missingValues, fileInfo);
+ logger.error("VES event parsing. File information wrong. Missing data: {} Data: {}", missingValues, fileInfo);
return Optional.empty();
}
/**
- * Gets data from the event name, defined as:
- * {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example:
+ * Gets data from the event name.
+ * Defined as: {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example:
* Noti_RnNode-Ericsson_FileReady
*
* @param dataType The type of data to get, {@link DmaapConsumerJsonParser.EventNameDataType}.
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
index 2cb84112..e2dca182 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
@@ -13,6 +13,7 @@
* the License.
* ============LICENSE_END========================================================================
*/
+
package org.onap.dcaegen2.collectors.datafile.service;
import java.nio.file.Path;
@@ -29,14 +30,30 @@ import java.util.Map;
public class PublishedFileCache {
private final Map<Path, Instant> publishedFiles = Collections.synchronizedMap(new HashMap<Path, Instant>());
+ /**
+ * Adds a file to the cache.
+ *
+ * @param path the name of the file to add.
+ * @return <code>null</code> if the file is not already in the cache.
+ */
public Instant put(Path path) {
return publishedFiles.put(path, Instant.now());
}
+ /**
+ * Removes a file from the cache.
+ *
+ * @param localFileName name of the file to remove.
+ */
public void remove(Path localFileName) {
publishedFiles.remove(localFileName);
}
+ /**
+ * Removes files 24 hours older than the given instant.
+ *
+ * @param now the instant will determine which files that will be purged.
+ */
public void purge(Instant now) {
for (Iterator<Map.Entry<Path, Instant>> it = publishedFiles.entrySet().iterator(); it.hasNext();) {
Map.Entry<Path, Instant> pair = it.next();
@@ -46,7 +63,7 @@ public class PublishedFileCache {
}
}
- public int size() {
+ int size() {
return publishedFiles.size();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java
index f6daf733..49e2f01e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java
@@ -30,8 +30,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consume
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.reactive.function.client.WebClient;
-import reactor.core.publisher.Mono;
+
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
@@ -39,25 +40,21 @@ import reactor.core.publisher.Flux;
public class DMaaPMessageConsumerTask {
private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumerTask.class);
- private AppConfig datafileAppConfig;
- private JsonMessageParser jsonMessageParser;
- private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
+ private final JsonMessageParser jsonMessageParser;
+ private final DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
public DMaaPMessageConsumerTask(AppConfig datafileAppConfig) {
- this.datafileAppConfig = datafileAppConfig;
this.jsonMessageParser = new JsonMessageParser();
+ this.dmaaPConsumerReactiveHttpClient = createHttpClient(datafileAppConfig);
}
- protected DMaaPMessageConsumerTask(AppConfig datafileAppConfig,
- DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
+ protected DMaaPMessageConsumerTask(DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
JsonMessageParser messageParser) {
- this.datafileAppConfig = datafileAppConfig;
this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient;
this.jsonMessageParser = messageParser;
}
public Flux<FileReadyMessage> execute() {
- dmaaPConsumerReactiveHttpClient = resolveClient();
logger.trace("execute called");
return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()));
}
@@ -67,15 +64,10 @@ public class DMaaPMessageConsumerTask {
return jsonMessageParser.getMessagesFromJson(message);
}
- protected DmaapConsumerConfiguration resolveConfiguration() {
- return datafileAppConfig.getDmaapConsumerConfiguration();
+ private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig) {
+ DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration();
+ WebClient client = new DmaapReactiveWebClient().fromConfiguration(config).build();
+ return new DMaaPConsumerReactiveHttpClient(config, client);
}
- protected DMaaPConsumerReactiveHttpClient resolveClient() {
- return new DMaaPConsumerReactiveHttpClient(resolveConfiguration(), buildWebClient());
- }
-
- protected WebClient buildWebClient() {
- return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
- }
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index 57edc364..4207d1fc 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -1,24 +1,46 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.collectors.datafile.tasks;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.file.Path;
import java.time.Duration;
import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
@@ -26,17 +48,30 @@ import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReact
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.springframework.core.io.FileSystemResource;
+import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
+
import reactor.core.publisher.Mono;
/**
+ * Publishes a file to the DataRouter.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
public class DataRouterPublisher {
+ private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
+ private static final String CONTENT_TYPE = "application/octet-stream";
+ private static final String NAME_JSON_TAG = "name";
+ private static final String INTERNAL_LOCATION_JSON_TAG = "internalLocation";
+ private static final String PUBLISH_TOPIC = "publish";
+ private static final String DEFAULT_FEED_ID = "1";
private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class);
private final AppConfig datafileAppConfig;
+ private DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient;
public DataRouterPublisher(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
@@ -44,25 +79,71 @@ public class DataRouterPublisher {
/**
- * Publish one file
- * @param consumerDmaapModel information about the file to publish
- * @param maxNumberOfRetries the maximal number of retries if the publishing fails
- * @param firstBackoffTimeout the time to delay the first retry
- * @return the HTTP response status as a string
+ * Publish one file.
+ *
+ * @param model information about the file to publish
+ * @param numRetries the maximal number of retries if the publishing fails
+ * @param firstBackoff the time to delay the first retry
+ * @param contextMap tracing context variables
+ * @return the (same) ConsumerDmaapModel
*/
public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff,
Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
- logger.trace("Method called with arg {}", model);
- DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient();
+ logger.trace("Publish called with arg {}", model);
+ dmaapProducerReactiveHttpClient = resolveClient();
- //@formatter:off
return Mono.just(model)
.cache()
- .flatMap(m -> dmaapProducerReactiveHttpClient.getDmaapProducerResponse(m, contextMap))
- .flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap))
+ .flatMap(m -> publishFile(m, contextMap)) //
+ .flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap)) //
.retryBackoff(numRetries, firstBackoff);
- //@formatter:on
+ }
+
+ private Mono<HttpStatus> publishFile(ConsumerDmaapModel consumerDmaapModel, Map<String, String> contextMap) {
+ logger.trace("Entering publishFile with {}", consumerDmaapModel);
+ try {
+ HttpPut put = new HttpPut();
+ String requestId = MDC.get(REQUEST_ID);
+ put.addHeader(X_ONAP_REQUEST_ID, requestId);
+ String invocationId = UUID.randomUUID().toString();
+ put.addHeader(X_INVOCATION_ID, invocationId);
+
+ prepareHead(consumerDmaapModel, put);
+ prepareBody(consumerDmaapModel, put);
+ dmaapProducerReactiveHttpClient.addUserCredentialsToHead(put);
+
+ HttpResponse response =
+ dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, contextMap);
+ logger.trace(response.toString());
+ return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
+ } catch (Exception e) {
+ logger.warn("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e);
+ return Mono.error(e);
+ }
+ }
+
+ private void prepareHead(ConsumerDmaapModel model, HttpPut put) {
+ put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE);
+ JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
+ metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
+ metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
+ put.addHeader(X_DMAAP_DR_META, metaData.toString());
+ put.setURI(getPublishUri(model.getInternalLocation().getFileName().toString()));
+ }
+
+ private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException {
+ Path fileLocation = model.getInternalLocation();
+ try (InputStream fileInputStream = createInputStream(fileLocation)) {
+ put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
+ }
+ }
+
+ private URI getPublishUri(String fileName) {
+ return dmaapProducerReactiveHttpClient.getBaseUri() //
+ .pathSegment(PUBLISH_TOPIC) //
+ .pathSegment(DEFAULT_FEED_ID) //
+ .pathSegment(fileName).build();
}
private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model,
@@ -77,6 +158,10 @@ public class DataRouterPublisher {
}
}
+ InputStream createInputStream(Path filePath) throws IOException {
+ FileSystemResource realResource = new FileSystemResource(filePath);
+ return realResource.getInputStream();
+ }
DmaapPublisherConfiguration resolveConfiguration() {
return datafileAppConfig.getDmaapPublisherConfiguration();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index af4670e3..8849b45e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -20,6 +20,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Map;
+
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
@@ -33,6 +34,7 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import reactor.core.publisher.Mono;
/**
@@ -47,33 +49,31 @@ public class FileCollector {
this.datafileAppConfig = datafileAppConfig;
}
- public Mono<ConsumerDmaapModel> execute(FileData fileData, MessageMetaData metaData, long maxNumberOfRetries,
- Duration firstBackoffTimeout, Map<String, String> contextMap) {
+ public Mono<ConsumerDmaapModel> execute(FileData fileData, long maxNumberOfRetries, Duration firstBackoffTimeout,
+ Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
logger.trace("Entering execute with {}", fileData);
- //@formatter:off
- return Mono.just(fileData)
- .cache()
- .flatMap(fd -> collectFile(fileData, metaData, contextMap))
- .retryBackoff(maxNumberOfRetries, firstBackoffTimeout);
- //@formatter:on
+ return Mono.just(fileData) //
+ .cache() //
+ .flatMap(fd -> collectFile(fileData, contextMap)) //
+ .retryBackoff(maxNumberOfRetries, firstBackoffTimeout);
}
- private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData,
- Map<String, String> contextMap) {
+ private Mono<ConsumerDmaapModel> collectFile(FileData fileData, Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
- logger.trace("starting to collectFile");
+ logger.trace("starting to collectFile {}", fileData.name());
final String remoteFile = fileData.remoteFilePath();
final Path localFile = fileData.getLocalFileName();
try (FileCollectClient currentClient = createClient(fileData)) {
+ currentClient.open();
localFile.getParent().toFile().mkdir(); // Create parent directories
currentClient.collectFile(remoteFile, localFile);
- return Mono.just(getConsumerDmaapModel(fileData, metaData, localFile));
+ return Mono.just(getConsumerDmaapModel(fileData, localFile));
} catch (Exception throwable) {
- logger.warn("Failed to download file: {}, reason: {}", fileData.name(), throwable);
+ logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(), throwable.toString());
return Mono.error(throwable);
}
}
@@ -89,37 +89,32 @@ public class FileCollector {
}
}
- private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, MessageMetaData metaData, Path localFile) {
+ private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, Path localFile) {
String location = fileData.location();
-
- // @formatter:off
- return ImmutableConsumerDmaapModel.builder()
- .productName(metaData.productName())
- .vendorName(metaData.vendorName())
- .lastEpochMicrosec(metaData.lastEpochMicrosec())
- .sourceName(metaData.sourceName())
- .startEpochMicrosec(metaData.startEpochMicrosec())
- .timeZoneOffset(metaData.timeZoneOffset())
- .name(fileData.name())
- .location(location)
- .internalLocation(localFile.toString())
- .compression(fileData.compression())
- .fileFormatType(fileData.fileFormatType())
- .fileFormatVersion(fileData.fileFormatVersion())
+ MessageMetaData metaData = fileData.messageMetaData();
+ return ImmutableConsumerDmaapModel.builder() //
+ .productName(metaData.productName()) //
+ .vendorName(metaData.vendorName()) //
+ .lastEpochMicrosec(metaData.lastEpochMicrosec()) //
+ .sourceName(metaData.sourceName()) //
+ .startEpochMicrosec(metaData.startEpochMicrosec()) //
+ .timeZoneOffset(metaData.timeZoneOffset()) //
+ .name(fileData.name()) //
+ .location(location) //
+ .internalLocation(localFile) //
+ .compression(fileData.compression()) //
+ .fileFormatType(fileData.fileFormatType()) //
+ .fileFormatVersion(fileData.fileFormatVersion()) //
.build();
- // @formatter:on
}
- SftpClient createSftpClient(FileData fileData) throws DatafileTaskException {
- SftpClient client = new SftpClient(fileData.fileServerData());
- client.open();
- return client;
+ protected SftpClient createSftpClient(FileData fileData) {
+ return new SftpClient(fileData.fileServerData());
}
- FtpsClient createFtpsClient(FileData fileData) throws DatafileTaskException {
+ protected FtpsClient createFtpsClient(FileData fileData) {
FtpesConfig config = datafileAppConfig.getFtpesConfiguration();
- FtpsClient client = new FtpsClient(fileData.fileServerData());
- client.open(config.keyCert(), config.keyPassword(), Paths.get(config.trustedCA()), config.trustedCAPassword());
- return client;
+ return new FtpsClient(fileData.fileServerData(), config.keyCert(), config.keyPassword(),
+ Paths.get(config.trustedCA()), config.trustedCAPassword());
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
new file mode 100644
index 00000000..41f8e3cd
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
@@ -0,0 +1,120 @@
+/*-
+* ============LICENSE_START=======================================================
+* Copyright (C) 2019 Nordix Foundation.
+* ================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+* SPDX-License-Identifier: Apache-2.0
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpGet;
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
+import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+/**
+ * Bean used to check with DataRouter if a file has been published.
+ *
+ * @author <a href="mailto:maxime.bonneau@est.tech">Maxime Bonneau</a>
+ *
+ */
+public class PublishedChecker {
+ private static final String FEEDLOG_TOPIC = "feedlog";
+ private static final String DEFAULT_FEED_ID = "1";
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private final AppConfig appConfig;
+
+ /**
+ * Constructor.
+ *
+ * @param appConfig The DFC configuration.
+ */
+ public PublishedChecker(AppConfig appConfig) {
+ this.appConfig = appConfig;
+ }
+
+ /**
+ * Checks with DataRouter if the given file has been published already.
+ *
+ * @param fileName the name of the file used when it is published.
+ *
+ * @return <code>true</code> if the file has been published before, <code>false</code> otherwise.
+ */
+ public boolean execute(String fileName, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
+ DmaapProducerReactiveHttpClient producerClient = resolveClient();
+
+ HttpGet getRequest = new HttpGet();
+ String requestId = MDC.get(REQUEST_ID);
+ getRequest.addHeader(X_ONAP_REQUEST_ID, requestId);
+ String invocationId = UUID.randomUUID().toString();
+ getRequest.addHeader(X_INVOCATION_ID, invocationId);
+ getRequest.setURI(getPublishedQueryUri(fileName, producerClient));
+ producerClient.addUserCredentialsToHead(getRequest);
+
+ try {
+ HttpResponse response =
+ producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest, 2000, contextMap);
+
+ logger.trace("{}", response);
+ int status = response.getStatusLine().getStatusCode();
+ HttpEntity entity = response.getEntity();
+ try (InputStream content = entity.getContent()) {
+ String body = IOUtils.toString(content);
+ return HttpStatus.SC_OK == status && !"[]".equals(body);
+ }
+ } catch (Exception e) {
+ logger.warn("Unable to check if file has been published.", e);
+ return false;
+ }
+ }
+
+ private URI getPublishedQueryUri(String fileName, DmaapProducerReactiveHttpClient producerClient) {
+ return producerClient.getBaseUri() //
+ .pathSegment(FEEDLOG_TOPIC) //
+ .pathSegment(DEFAULT_FEED_ID) //
+ .queryParam("type", "pub") //
+ .queryParam("filename", fileName) //
+ .build();
+ }
+
+ protected DmaapPublisherConfiguration resolveConfiguration() {
+ return appConfig.getDmaapPublisherConfiguration();
+ }
+
+ protected DmaapProducerReactiveHttpClient resolveClient() {
+ return new DmaapProducerReactiveHttpClient(resolveConfiguration());
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 28963377..b4096c73 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -18,18 +18,15 @@ package org.onap.dcaegen2.collectors.datafile.tasks;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
-import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.collectors.datafile.service.PublishedFileCache;
import org.slf4j.Logger;
@@ -37,6 +34,7 @@ import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
@@ -49,25 +47,18 @@ import reactor.core.scheduler.Schedulers;
@Component
public class ScheduledTasks {
- private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200;
- private static final int MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS = 10;
-
- /** Data needed for fetching of one file */
- private class FileCollectionData {
- final FileData fileData;
- final MessageMetaData metaData;
-
- FileCollectionData(FileData fd, MessageMetaData metaData) {
- this.fileData = fd;
- this.metaData = metaData;
- }
- }
+ private static final int NUMBER_OF_WORKER_THREADS = 100;
+ private static final int MAX_TASKS_FOR_POLLING = 50;
+ private static final long DATA_ROUTER_MAX_RETRIES = 5;
+ private static final Duration DATA_ROUTER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(2);
+ private static final long FILE_TRANSFER_MAX_RETRIES = 3;
+ private static final Duration FILE_TRANSFER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(5);
private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
+
private final AppConfig applicationConfiguration;
private final AtomicInteger currentNumberOfTasks = new AtomicInteger();
- private final Scheduler scheduler =
- Schedulers.newElastic("DataFileCollector", MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS);
+ private final Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS);
PublishedFileCache alreadyPublishedFiles = new PublishedFileCache();
/**
@@ -84,86 +75,93 @@ public class ScheduledTasks {
* Main function for scheduling for the file collection Workflow.
*/
public void scheduleMainDatafileEventTask(Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
- logger.trace("Execution of tasks was registered");
- applicationConfiguration.loadConfigurationFromFile();
- createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap),
- () -> onComplete(contextMap));
+ try {
+ MdcVariables.setMdcContextMap(contextMap);
+ logger.trace("Execution of tasks was registered");
+ applicationConfiguration.loadConfigurationFromFile();
+ createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap),
+ () -> onComplete(contextMap));
+ } catch (Exception e) {
+ logger.error("Unexpected exception: ", e);
+ }
}
Flux<ConsumerDmaapModel> createMainTask(Map<String, String> contextMap) {
return fetchMoreFileReadyMessages() //
- .parallel(getParallelism()) // Each FileReadyMessage in a separate thread
+ .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread
.runOn(scheduler) //
- .flatMap(this::createFileCollectionTask) //
- .filter(this::shouldBePublished) //
+ .flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) //
+ .filter(fileData -> shouldBePublished(fileData, contextMap)) //
.doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
- .flatMap(fileData -> collectFileFromXnf(fileData, contextMap)) //
+ .flatMap(fileData -> fetchFile(fileData, contextMap)) //
.flatMap(model -> publishToDataRouter(model, contextMap)) //
- .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()), contextMap)) //
+ .doOnNext(model -> deleteFile(model.getInternalLocation(), contextMap)) //
.doOnNext(model -> currentNumberOfTasks.decrementAndGet()) //
.sequential();
}
/**
- * called in regular intervals to remove out-dated cached information
+ * called in regular intervals to remove out-dated cached information.
*/
public void purgeCachedInformation(Instant now) {
alreadyPublishedFiles.purge(now);
}
- private void onComplete(Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
- logger.info("Datafile tasks have been completed");
+ protected PublishedChecker createPublishedChecker() {
+ return new PublishedChecker(applicationConfiguration);
}
- private void onSuccess(ConsumerDmaapModel model, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
- logger.info("Datafile consumed tasks {}", model.getInternalLocation());
+ protected int getCurrentNumberOfTasks() {
+ return currentNumberOfTasks.get();
}
- private void onError(Throwable throwable, Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
- logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable);
+ protected DMaaPMessageConsumerTask createConsumerTask() {
+ return new DMaaPMessageConsumerTask(this.applicationConfiguration);
}
- private int getParallelism() {
- if (MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks() > 0) {
- return MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks();
- } else {
- return 1; // We need at least one rail/thread
- }
+ protected FileCollector createFileCollector() {
+ return new FileCollector(applicationConfiguration);
}
- private Flux<FileCollectionData> createFileCollectionTask(FileReadyMessage availableFiles) {
- List<FileCollectionData> fileCollects = new ArrayList<>();
+ protected DataRouterPublisher createDataRouterPublisher() {
+ return new DataRouterPublisher(applicationConfiguration);
+ }
- for (FileData fileData : availableFiles.files()) {
- fileCollects.add(new FileCollectionData(fileData, availableFiles.messageMetaData()));
- }
- return Flux.fromIterable(fileCollects);
+ private void onComplete(Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
+ logger.trace("Datafile tasks have been completed");
+ }
+
+ private synchronized void onSuccess(ConsumerDmaapModel model, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
+ logger.info("Datafile file published {}", model.getInternalLocation());
}
- private boolean shouldBePublished(FileCollectionData task) {
- return alreadyPublishedFiles.put(task.fileData.getLocalFileName()) == null;
+ private void onError(Throwable throwable, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
+ logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable.toString());
}
- private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect,
- Map<String, String> contextMap) {
- final long maxNUmberOfRetries = 3;
- final Duration initialRetryTimeout = Duration.ofSeconds(5);
+ private boolean shouldBePublished(FileData fileData, Map<String, String> contextMap) {
+ boolean result = false;
+ Path localFileName = fileData.getLocalFileName();
+ if (alreadyPublishedFiles.put(localFileName) == null) {
+ result = !createPublishedChecker().execute(localFileName.getFileName().toString(), contextMap);
+ }
+ return result;
+ }
+ private Mono<ConsumerDmaapModel> fetchFile(FileData fileData, Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
return createFileCollector()
- .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout,
- contextMap)
- .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, contextMap));
+ .execute(fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, contextMap)
+ .onErrorResume(exception -> handleFetchFileFailure(fileData, contextMap));
}
- private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData, Map<String, String> contextMap) {
+ private Mono<ConsumerDmaapModel> handleFetchFileFailure(FileData fileData, Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
Path localFileName = fileData.getLocalFileName();
- logger.error("File fetching failed: {}", localFileName);
+ logger.error("File fetching failed, fileData {}", fileData);
deleteFile(localFileName, contextMap);
alreadyPublishedFiles.remove(localFileName);
currentNumberOfTasks.decrementAndGet();
@@ -171,21 +169,17 @@ public class ScheduledTasks {
}
private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model, Map<String, String> contextMap) {
- final long maxNumberOfRetries = 3;
- final Duration initialRetryTimeout = Duration.ofSeconds(5);
-
- DataRouterPublisher publisherTask = createDataRouterPublisher();
-
MdcVariables.setMdcContextMap(contextMap);
- return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout, contextMap)
- .onErrorResume(exception -> handlePublishFailure(model, exception, contextMap));
+
+ return createDataRouterPublisher()
+ .execute(model, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT, contextMap)
+ .onErrorResume(exception -> handlePublishFailure(model, contextMap));
}
- private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception,
- Map<String, String> contextMap) {
+ private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
- logger.error("File publishing failed: {}, exception: {}", model.getName(), exception);
- Path internalFileName = Paths.get(model.getInternalLocation());
+ logger.error("File publishing failed: {}", model);
+ Path internalFileName = model.getInternalLocation();
deleteFile(internalFileName, contextMap);
alreadyPublishedFiles.remove(internalFileName);
currentNumberOfTasks.decrementAndGet();
@@ -196,8 +190,9 @@ public class ScheduledTasks {
* Fetch more messages from the message router. This is done in a polling/blocking fashion.
*/
private Flux<FileReadyMessage> fetchMoreFileReadyMessages() {
- logger.trace("Consuming new file ready messages, current number of tasks: {}", getCurrentNumberOfTasks());
- if (getCurrentNumberOfTasks() > MAX_NUMBER_OF_CONCURRENT_TASKS) {
+ logger.info("Consuming new file ready messages, current number of tasks: {}", getCurrentNumberOfTasks());
+ if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING) {
+ logger.info("Skipping, current number of tasks: {}", getCurrentNumberOfTasks());
return Flux.empty();
}
@@ -209,7 +204,8 @@ public class ScheduledTasks {
private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
- logger.error("Polling for file ready message failed, exception: {}", exception);
+ logger.error("Polling for file ready message failed, exception: {}, config: {}", exception.toString(),
+ this.applicationConfiguration.getDmaapConsumerConfiguration());
return Flux.empty();
}
@@ -223,20 +219,4 @@ public class ScheduledTasks {
}
}
- int getCurrentNumberOfTasks() {
- return currentNumberOfTasks.get();
- }
-
- DMaaPMessageConsumerTask createConsumerTask() {
- return new DMaaPMessageConsumerTask(this.applicationConfiguration);
- }
-
- FileCollector createFileCollector() {
- return new FileCollector(applicationConfiguration);
- }
-
- DataRouterPublisher createDataRouterPublisher() {
- return new DataRouterPublisher(applicationConfiguration);
- }
-
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
index 2c136304..2c136304 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
index acae1e6e..b67fac23 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
@@ -28,56 +28,49 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.Immutabl
class CloudConfigParserTest {
-
- private static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG =
- //@formatter:on
- new ImmutableDmaapConsumerConfiguration.Builder()
- .timeoutMs(-1)
- .dmaapHostName("message-router.onap.svc.cluster.local")
- .dmaapUserName("admin")
- .dmaapUserPassword("admin")
- .dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT")
- .dmaapPortNumber(2222)
- .dmaapContentType("application/json")
- .messageLimit(-1)
- .dmaapProtocol("http")
- .consumerId("C12")
- .consumerGroup("OpenDCAE-c12")
- .trustStorePath("trustStorePath")
- .trustStorePasswordPath("trustStorePasswordPath")
- .keyStorePath("keyStorePath")
- .keyStorePasswordPath("keyStorePasswordPath")
- .enableDmaapCertAuth(true)
+ private static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = //
+ new ImmutableDmaapConsumerConfiguration.Builder() //
+ .timeoutMs(-1) //
+ .dmaapHostName("message-router.onap.svc.cluster.local") //
+ .dmaapUserName("admin") //
+ .dmaapUserPassword("admin") //
+ .dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT") //
+ .dmaapPortNumber(2222) //
+ .dmaapContentType("application/json") //
+ .messageLimit(-1) //
+ .dmaapProtocol("http") //
+ .consumerId("C12") //
+ .consumerGroup("OpenDCAE-c12") //
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
.build();
- //@formatter:off
-
- private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG =
- //@formatter:on
- new ImmutableDmaapPublisherConfiguration.Builder()
- .dmaapTopicName("publish")
- .dmaapUserPassword("dradmin")
- .dmaapPortNumber(3907)
- .dmaapProtocol("https")
- .dmaapContentType("application/json")
- .dmaapHostName("message-router.onap.svc.cluster.local")
- .dmaapUserName("dradmin")
- .trustStorePath("trustStorePath")
- .trustStorePasswordPath("trustStorePasswordPath")
- .keyStorePath("keyStorePath")
- .keyStorePasswordPath("keyStorePasswordPath")
- .enableDmaapCertAuth(true)
+
+ private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = //
+ new ImmutableDmaapPublisherConfiguration.Builder() //
+ .dmaapTopicName("publish") //
+ .dmaapUserPassword("dradmin") //
+ .dmaapPortNumber(3907) //
+ .dmaapProtocol("https") //
+ .dmaapContentType("application/json") //
+ .dmaapHostName("message-router.onap.svc.cluster.local") //
+ .dmaapUserName("dradmin") //
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
.build();
- //@formatter:off
-
- private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION =
- //@formatter:on
- new ImmutableFtpesConfig.Builder()
- .keyCert("/config/ftpKey.jks")
- .keyPassword("secret")
- .trustedCA("config/cacerts")
- .trustedCAPassword("secret")
+
+ private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION = //
+ new ImmutableFtpesConfig.Builder() //
+ .keyCert("/config/ftpKey.jks") //
+ .keyPassword("secret") //
+ .trustedCA("config/cacerts") //
+ .trustedCAPassword("secret") //
.build();
- //@formatter:off
private CloudConfigParser cloudConfigParser = new CloudConfigParser(getCloudConfigJsonObject());
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java
index 1f5827c8..84c5e07b 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java
@@ -44,6 +44,20 @@ public class FileDataTest {
private static final String LOCATION_WITHOUT_USER =
FTPES_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
+
+ private MessageMetaData messageMetaData() {
+ return ImmutableMessageMetaData.builder()
+ .productName("PRODUCT_NAME")
+ .vendorName("VENDOR_NAME")
+ .lastEpochMicrosec("LAST_EPOCH_MICROSEC")
+ .sourceName("SOURCE_NAME")
+ .startEpochMicrosec("START_EPOCH_MICROSEC")
+ .timeZoneOffset("TIME_ZONE_OFFSET")
+ .changeIdentifier("PM_MEAS_CHANGE_IDENTIFIER")
+ .changeType("FILE_READY_CHANGE_TYPE")
+ .build();
+ }
+
private FileData properFileDataWithUser() {
// @formatter:off
return ImmutableFileData.builder()
@@ -53,6 +67,7 @@ public class FileDataTest {
.fileFormatType("type")
.fileFormatVersion("version")
.scheme(Scheme.FTPS)
+ .messageMetaData(messageMetaData())
.build();
// @formatter:on
}
@@ -66,6 +81,7 @@ public class FileDataTest {
.fileFormatType("type")
.fileFormatVersion("version")
.scheme(Scheme.FTPS)
+ .messageMetaData(messageMetaData())
.build();
// @formatter:on
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
index f7b83297..b8aa7da2 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
@@ -29,7 +29,6 @@ import java.util.Optional;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
@@ -67,49 +66,47 @@ class JsonMessageParserTest {
private static final String NOTIFICATION_FIELDS_VERSION = "1.0";
@Test
- void whenPassingCorrectJson_oneFileReadyMessage() throws DmaapNotFoundException {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
- .build();
-
- MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .build();
- FileData expectedFileData = ImmutableFileData.builder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .scheme(Scheme.FTPS)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ void whenPassingCorrectJson_oneFileReadyMessage() {
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
+ .build();
+
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .build();
+ FileData expectedFileData = ImmutableFileData.builder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .scheme(Scheme.FTPS) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .messageMetaData(messageMetaData)
.build();
List<FileData> files = new ArrayList<>();
files.add(expectedFileData);
FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
- .pnfName(SOURCE_NAME)
- .messageMetaData(messageMetaData)
- .files(files)
+ .files(files) //
.build();
- // @formatter:on
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -122,49 +119,47 @@ class JsonMessageParserTest {
}
@Test
- void whenPassingCorrectJsonWithTwoEvents_twoMessages() throws DmaapNotFoundException {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
- .build();
-
- MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .build();
- FileData expectedFileData = ImmutableFileData.builder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .scheme(Scheme.FTPS)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ void whenPassingCorrectJsonWithTwoEvents_twoMessages() {
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
+ .build();
+
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .build();
+ FileData expectedFileData = ImmutableFileData.builder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .scheme(Scheme.FTPS) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .messageMetaData(messageMetaData)
.build();
List<FileData> files = new ArrayList<>();
files.add(expectedFileData);
- FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
- .pnfName(SOURCE_NAME)
- .messageMetaData(messageMetaData)
- .files(files)
+ FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() //
+ .files(files) //
.build();
- // @formatter:on
+
String parsedString = message.getParsed();
String messageString = "[" + parsedString + "," + parsedString + "]";
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -178,21 +173,20 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithoutLocation_noMessage() {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
.build();
- // @formatter:on
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -205,49 +199,47 @@ class JsonMessageParserTest {
}
@Test
- void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() throws DmaapNotFoundException {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
- .build();
-
- MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .build();
- FileData expectedFileData = ImmutableFileData.builder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .scheme(Scheme.FTPS)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() {
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
+ .build();
+
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .build();
+ FileData expectedFileData = ImmutableFileData.builder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .scheme(Scheme.FTPS) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .messageMetaData(messageMetaData)
.build();
List<FileData> files = new ArrayList<>();
files.add(expectedFileData);
- FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
- .pnfName(SOURCE_NAME)
- .messageMetaData(messageMetaData)
- .files(files)
+ FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() //
+ .files(files) //
.build();
- // @formatter:on
+
String parsedString = message.getParsed();
String messageString = "[{\"event\":{}}," + parsedString + "]";
JsonMessageParser jsonMessageParserUnderTest = new JsonMessageParser();
@@ -258,21 +250,20 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithFaultyEventName_noFileData() {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName("Faulty event name")
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName("Faulty event name") //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
.build();
- // @formatter:on
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -286,21 +277,20 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithoutName_noFileData() {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
.build();
- // @formatter:on
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -314,14 +304,13 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithoutAdditionalFields_noFileData() {
- // @formatter:off
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .build();
- // @formatter:on
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .build();
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -335,21 +324,20 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithoutCompression_noFileData() {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
- .build();
- // @formatter:on
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
+ .build();
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -363,21 +351,20 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithoutFileFormatType_noFileData() {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
.build();
- // @formatter:on
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -391,55 +378,53 @@ class JsonMessageParserTest {
@Test
void whenPassingOneCorrectJsonWithoutFileFormatVersionAndOneCorrect_oneFileData() {
- // @formatter:off
- AdditionalField additionalFaultyField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .build();
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
- .build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalFaultyField)
- .addAdditionalField(additionalField)
- .build();
-
- MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .build();
- FileData expectedFileData = ImmutableFileData.builder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .scheme(Scheme.FTPS)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalFaultyField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .build();
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalFaultyField) //
+ .addAdditionalField(additionalField) //
+ .build();
+
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .build();
+ FileData expectedFileData = ImmutableFileData.builder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .scheme(Scheme.FTPS) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .messageMetaData(messageMetaData)
.build();
List<FileData> files = new ArrayList<>();
files.add(expectedFileData);
- FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
- .pnfName(SOURCE_NAME)
- .messageMetaData(messageMetaData)
- .files(files)
+ FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() //
+ .files(files) //
.build();
- // @formatter:on
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -453,14 +438,13 @@ class JsonMessageParserTest {
@Test
void whenPassingJsonWithoutMandatoryHeaderInformation_noFileData() {
- // @formatter:off
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier("PM_MEAS_FILES_INVALID")
- .changeType("FileReady_INVALID")
- .notificationFieldsVersion("1.0_INVALID")
- .build();
- // @formatter:on
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier("PM_MEAS_FILES_INVALID") //
+ .changeType("FileReady_INVALID") //
+ .notificationFieldsVersion("1.0_INVALID") //
+ .build();
+
String incorrectMessageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -486,21 +470,20 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithIncorrectChangeType_noFileData() {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
.compression(GZIP_COMPRESSION)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(CHANGE_IDENTIFIER)
- .changeType(INCORRECT_CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .changeType(INCORRECT_CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
.build();
- // @formatter:on
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -514,21 +497,20 @@ class JsonMessageParserTest {
@Test
void whenPassingCorrectJsonWithIncorrectChangeIdentifier_noFileData() {
- // @formatter:off
- AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
- .name(PM_FILE_NAME)
- .location(LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage message = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(INCORRECT_CHANGE_IDENTIFIER)
- .changeType(CHANGE_TYPE)
- .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
- .addAdditionalField(additionalField)
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(INCORRECT_CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
.build();
- // @formatter:on
+
String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
index f88e301d..98c7dc32 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
@@ -20,7 +20,6 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -28,6 +27,8 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
@@ -74,7 +75,7 @@ public class DMaaPMessageConsumerTaskImplTest {
private static final String PORT_22 = "22";
private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME;
- private static final String LOCAL_FILE_LOCATION = "target/" + PM_FILE_NAME;
+ private static final Path LOCAL_FILE_LOCATION = Paths.get("target/" + PM_FILE_NAME);
private static final String FTPES_LOCATION = FTPES_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
private static final String GZIP_COMPRESSION = "gzip";
@@ -86,7 +87,7 @@ public class DMaaPMessageConsumerTaskImplTest {
private static AppConfig appConfig;
private static DmaapConsumerConfiguration dmaapConsumerConfiguration;
private DMaaPMessageConsumerTask messageConsumerTask;
- private DMaaPConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient;
+ private DMaaPConsumerReactiveHttpClient httpClientMock;
private static String ftpesMessageString;
private static FileData ftpesFileData;
@@ -96,156 +97,161 @@ public class DMaaPMessageConsumerTaskImplTest {
private static FileData sftpFileData;
private static FileReadyMessage expectedSftpMessage;
+ /**
+ * Sets up data for the test.
+ */
@BeforeAll
public static void setUp() {
- //@formatter:off
- dmaapConsumerConfiguration = new ImmutableDmaapConsumerConfiguration.Builder()
- .consumerGroup("OpenDCAE-c12")
- .consumerId("c12")
- .dmaapContentType("application/json")
- .dmaapHostName("54.45.33.2")
- .dmaapPortNumber(1234).dmaapProtocol("https")
- .dmaapUserName("Datafile")
- .dmaapUserPassword("Datafile")
- .dmaapTopicName("unauthenticated.NOTIFICATION")
- .timeoutMs(-1)
- .messageLimit(-1)
- .trustStorePath("trustStorePath")
- .trustStorePasswordPath("trustStorePasswordPath")
- .keyStorePath("keyStorePath")
- .keyStorePasswordPath("keyStorePasswordPath")
- .enableDmaapCertAuth(true)
+ dmaapConsumerConfiguration = new ImmutableDmaapConsumerConfiguration.Builder() //
+ .consumerGroup("OpenDCAE-c12") //
+ .consumerId("c12") //
+ .dmaapContentType("application/json") //
+ .dmaapHostName("54.45.33.2") //
+ .dmaapPortNumber(1234).dmaapProtocol("https") //
+ .dmaapUserName("Datafile") //
+ .dmaapUserPassword("Datafile") //
+ .dmaapTopicName("unauthenticated.NOTIFICATION") //
+ .timeoutMs(-1) //
+ .messageLimit(-1) //
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
.build();
appConfig = mock(AppConfig.class);
- AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder()
- .location(FTPES_LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .location(FTPES_LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage ftpesJsonMessage = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
- .changeType(FILE_READY_CHANGE_TYPE)
- .notificationFieldsVersion("1.0")
- .addAdditionalField(ftpesAdditionalField)
+ JsonMessage ftpesJsonMessage = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) //
+ .changeType(FILE_READY_CHANGE_TYPE) //
+ .notificationFieldsVersion("1.0") //
+ .addAdditionalField(ftpesAdditionalField) //
.build();
ftpesMessageString = ftpesJsonMessage.toString();
- MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
- .changeType(FILE_READY_CHANGE_TYPE)
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) //
+ .changeType(FILE_READY_CHANGE_TYPE) //
.build();
- ftpesFileData = ImmutableFileData.builder()
- .name(PM_FILE_NAME)
- .location(FTPES_LOCATION)
- .scheme(Scheme.FTPS)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ ftpesFileData = ImmutableFileData.builder() //
+ .name(PM_FILE_NAME) //
+ .location(FTPES_LOCATION) //
+ .scheme(Scheme.FTPS) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .messageMetaData(messageMetaData)
.build();
List<FileData> files = new ArrayList<>();
files.add(ftpesFileData);
- expectedFtpesMessage = ImmutableFileReadyMessage.builder()
- .pnfName(SOURCE_NAME)
- .messageMetaData(messageMetaData)
- .files(files)
+ expectedFtpesMessage = ImmutableFileReadyMessage.builder() //
+ .files(files) //
.build();
- AdditionalField sftpAdditionalField = new JsonMessage.AdditionalFieldBuilder()
- .location(SFTP_LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField sftpAdditionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .location(SFTP_LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage sftpJsonMessage = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
- .changeType(FILE_READY_CHANGE_TYPE)
- .notificationFieldsVersion("1.0")
- .addAdditionalField(sftpAdditionalField)
+ JsonMessage sftpJsonMessage = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) //
+ .changeType(FILE_READY_CHANGE_TYPE) //
+ .notificationFieldsVersion("1.0") //
+ .addAdditionalField(sftpAdditionalField) //
.build();
sftpMessageString = sftpJsonMessage.toString();
- sftpFileData = ImmutableFileData.builder()
- .name(PM_FILE_NAME)
- .location(SFTP_LOCATION)
- .scheme(Scheme.FTPS)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ sftpFileData = ImmutableFileData.builder() //
+ .name(PM_FILE_NAME) //
+ .location(SFTP_LOCATION) //
+ .scheme(Scheme.FTPS) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .messageMetaData(messageMetaData)
.build();
- ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .name(PM_FILE_NAME)
- .location(FTPES_LOCATION)
- .internalLocation(LOCAL_FILE_LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .name(PM_FILE_NAME) //
+ .location(FTPES_LOCATION) //
+ .internalLocation(LOCAL_FILE_LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
listOfConsumerDmaapModel.add(consumerDmaapModel);
files = new ArrayList<>();
files.add(sftpFileData);
- expectedSftpMessage = ImmutableFileReadyMessage.builder()
- .pnfName(SOURCE_NAME)
- .messageMetaData(messageMetaData)
- .files(files)
+ expectedSftpMessage = ImmutableFileReadyMessage.builder() //
+ .files(files) //
.build();
- //@formatter:on
}
@Test
public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() {
prepareMocksForDmaapConsumer("", null);
- StepVerifier.create(messageConsumerTask.execute()).expectSubscription()
- .expectError(DatafileTaskException.class).verify();
+ StepVerifier.create(messageConsumerTask.execute()) //
+ .expectSubscription() //
+ .expectError(DatafileTaskException.class) //
+ .verify();
- verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
+ verify(httpClientMock, times(1)).getDMaaPConsumerResponse();
}
@Test
public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException {
prepareMocksForDmaapConsumer(ftpesMessageString, expectedFtpesMessage);
- StepVerifier.create(messageConsumerTask.execute()).expectNext(expectedFtpesMessage).verifyComplete();
+ StepVerifier.create(messageConsumerTask.execute()) //
+ .expectNext(expectedFtpesMessage) //
+ .verifyComplete();
- verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
- verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
+ verify(httpClientMock, times(1)).getDMaaPConsumerResponse();
+ verifyNoMoreInteractions(httpClientMock);
}
@Test
public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException {
prepareMocksForDmaapConsumer(sftpMessageString, expectedSftpMessage);
- StepVerifier.create(messageConsumerTask.execute()).expectNext(expectedSftpMessage).verifyComplete();
+ StepVerifier.create(messageConsumerTask.execute()) //
+ .expectNext(expectedSftpMessage) //
+ .verifyComplete();
- verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
- verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
+ verify(httpClientMock, times(1)).getDMaaPConsumerResponse();
+ verifyNoMoreInteractions(httpClientMock);
}
private void prepareMocksForDmaapConsumer(String message, FileReadyMessage fileReadyMessageAfterConsume) {
Mono<String> messageAsMono = Mono.just(message);
JsonMessageParser jsonMessageParserMock = mock(JsonMessageParser.class);
- dmaapConsumerReactiveHttpClient = mock(DMaaPConsumerReactiveHttpClient.class);
- when(dmaapConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(messageAsMono);
+ httpClientMock = mock(DMaaPConsumerReactiveHttpClient.class);
+ when(httpClientMock.getDMaaPConsumerResponse()).thenReturn(messageAsMono);
if (!message.isEmpty()) {
when(jsonMessageParserMock.getMessagesFromJson(messageAsMono))
@@ -255,9 +261,6 @@ public class DMaaPMessageConsumerTaskImplTest {
.thenReturn(Flux.error(new DatafileTaskException("problemas")));
}
- messageConsumerTask =
- spy(new DMaaPMessageConsumerTask(appConfig, dmaapConsumerReactiveHttpClient, jsonMessageParserMock));
- when(messageConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration);
- doReturn(dmaapConsumerReactiveHttpClient).when(messageConsumerTask).resolveClient();
+ messageConsumerTask = spy(new DMaaPMessageConsumerTask(httpClientMock, jsonMessageParserMock));
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
index d612d17c..fe867738 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -16,8 +16,9 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -25,19 +26,36 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+
+import org.apache.http.Header;
+import org.apache.http.HttpResponse;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
import org.springframework.http.HttpStatus;
-import reactor.core.publisher.Mono;
+import org.springframework.web.util.DefaultUriBuilderFactory;
+import org.springframework.web.util.UriBuilder;
+
import reactor.test.StepVerifier;
/**
@@ -52,31 +70,39 @@ class DataRouterPublisherTest {
private static final String START_EPOCH_MICROSEC = "8745745764578";
private static final String TIME_ZONE_OFFSET = "UTC+05:00";
private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
+ private static final String FTPES_ADDRESS = "ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME;
+ private static final String LOCAL_FILE_NAME = SOURCE_NAME + "_" + PM_FILE_NAME;
+
+ private static final String COMPRESSION = "gzip";
+ private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
+ private static final String FILE_FORMAT_VERSION = "V10";
+ private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
+
+ private static final String HOST = "54.45.33.2";
+ private static final String HTTPS_SCHEME = "https";
+ private static final int PORT = 1234;
+ private static final String APPLICATION_OCTET_STREAM_CONTENT_TYPE = "application/octet-stream";
+ private static final String PUBLISH_TOPIC = "publish";
+ private static final String FEED_ID = "1";
+ private static final String FILE_CONTENT = "Just a string.";
private static ConsumerDmaapModel consumerDmaapModel;
- private static DataRouterPublisher dmaapPublisherTask;
- private static DmaapProducerReactiveHttpClient dMaaPProducerReactiveHttpClient;
+ private static DmaapProducerReactiveHttpClient httpClientMock;
private static AppConfig appConfig;
- private static DmaapPublisherConfiguration dmaapPublisherConfiguration;
+ private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
+ private final Map<String, String> contextMap = new HashMap<>();
+ private static DataRouterPublisher publisherTaskUnderTestSpy;
+ /**
+ * Sets up data for tests.
+ */
@BeforeAll
public static void setUp() {
+ when(publisherConfigurationMock.dmaapHostName()).thenReturn(HOST);
+ when(publisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME);
+ when(publisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT);
- dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder()
- .dmaapContentType("application/json") //
- .dmaapHostName("54.45.33.2") //
- .dmaapPortNumber(1234) //
- .dmaapProtocol("https") //
- .dmaapUserName("DFC") //
- .dmaapUserPassword("DFC") //
- .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") //
- .trustStorePath("trustStorePath") //
- .trustStorePasswordPath("trustStorePasswordPath") //
- .keyStorePath("keyStorePath") //
- .keyStorePasswordPath("keyStorePasswordPath") //
- .enableDmaapCertAuth(true) //
- .build(); //
- consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
+ consumerDmaapModel = ImmutableConsumerDmaapModel.builder() //
.productName(PRODUCT_NAME) //
.vendorName(VENDOR_NAME) //
.lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
@@ -84,61 +110,144 @@ class DataRouterPublisherTest {
.startEpochMicrosec(START_EPOCH_MICROSEC) //
.timeZoneOffset(TIME_ZONE_OFFSET) //
.name(PM_FILE_NAME) //
- .location("ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME) //
- .internalLocation("target/" + PM_FILE_NAME) //
+ .location(FTPES_ADDRESS) //
+ .internalLocation(Paths.get("target/" + LOCAL_FILE_NAME)) //
.compression("gzip") //
- .fileFormatType("org.3GPP.32.435#measCollec") //
- .fileFormatVersion("V10") //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build(); //
appConfig = mock(AppConfig.class);
-
- doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
+ publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig));
}
@Test
- public void whenPassedObjectFits_ReturnsCorrectStatus() {
- prepareMocksForTests(Mono.just(HttpStatus.OK));
-
- Map<String, String> contextMap = new HashMap<>();
- StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
- .expectNext(consumerDmaapModel).verifyComplete();
-
- verify(dMaaPProducerReactiveHttpClient, times(1)).getDmaapProducerResponse(any(), eq(contextMap));
- verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
+ public void whenPassedObjectFits_ReturnsCorrectStatus() throws Exception {
+ prepareMocksForTests(null, Integer.valueOf(HttpStatus.OK.value()));
+ StepVerifier
+ .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
+ .expectNext(consumerDmaapModel) //
+ .verifyComplete();
+
+ ArgumentCaptor<HttpUriRequest> requestCaptor = ArgumentCaptor.forClass(HttpUriRequest.class);
+ verify(httpClientMock).getBaseUri();
+ verify(httpClientMock).addUserCredentialsToHead(any(HttpUriRequest.class));
+ verify(httpClientMock).getDmaapProducerResponseWithRedirect(requestCaptor.capture(), any());
+ verifyNoMoreInteractions(httpClientMock);
+
+ HttpPut actualPut = (HttpPut) requestCaptor.getValue();
+ URI actualUri = actualPut.getURI();
+ assertEquals(HTTPS_SCHEME, actualUri.getScheme());
+ assertEquals(HOST, actualUri.getHost());
+ assertEquals(PORT, actualUri.getPort());
+ Path actualPath = Paths.get(actualUri.getPath());
+ assertTrue(PUBLISH_TOPIC.equals(actualPath.getName(0).toString()));
+ assertTrue(FEED_ID.equals(actualPath.getName(1).toString()));
+ assertTrue(LOCAL_FILE_NAME.equals(actualPath.getName(2).toString()));
+
+ Header[] contentHeaders = actualPut.getHeaders("content-type");
+ assertEquals(APPLICATION_OCTET_STREAM_CONTENT_TYPE, contentHeaders[0].getValue());
+
+ Header[] metaHeaders = actualPut.getHeaders(X_DMAAP_DR_META);
+ Map<String, String> metaHash = getMetaDataAsMap(metaHeaders);
+ assertTrue(10 == metaHash.size());
+ assertEquals(PRODUCT_NAME, metaHash.get("productName"));
+ assertEquals(VENDOR_NAME, metaHash.get("vendorName"));
+ assertEquals(LAST_EPOCH_MICROSEC, metaHash.get("lastEpochMicrosec"));
+ assertEquals(SOURCE_NAME, metaHash.get("sourceName"));
+ assertEquals(START_EPOCH_MICROSEC, metaHash.get("startEpochMicrosec"));
+ assertEquals(TIME_ZONE_OFFSET, metaHash.get("timeZoneOffset"));
+ assertEquals(COMPRESSION, metaHash.get("compression"));
+ assertEquals(FTPES_ADDRESS, metaHash.get("location"));
+ assertEquals(FILE_FORMAT_TYPE, metaHash.get("fileFormatType"));
+ assertEquals(FILE_FORMAT_VERSION, metaHash.get("fileFormatVersion"));
}
@Test
- public void whenPassedObjectFits_firstFailsThenSucceeds() {
- prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.OK));
+ void whenPassedObjectFits_firstFailsWithExceptionThenSucceeds() throws Exception {
+ prepareMocksForTests(new DatafileTaskException("Error"), HttpStatus.OK.value());
- Map<String, String> contextMap = new HashMap<>();
- StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
- .expectNext(consumerDmaapModel).verifyComplete();
-
- verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any(), eq(contextMap));
- verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
+ StepVerifier
+ .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 2, Duration.ofSeconds(0), contextMap))
+ .expectNext(consumerDmaapModel) //
+ .verifyComplete();
}
@Test
- public void whenPassedObjectFits_firstFailsThenFails() {
- prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.BAD_GATEWAY));
-
- Map<String, String> contextMap = new HashMap<>();
- StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
- .expectErrorMessage("Retries exhausted: 1/1").verify();
+ public void whenPassedObjectFits_firstFailsThenSucceeds() throws Exception {
+ prepareMocksForTests(null, Integer.valueOf(HttpStatus.BAD_GATEWAY.value()),
+ Integer.valueOf(HttpStatus.OK.value()));
+
+ StepVerifier
+ .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
+ .expectNext(consumerDmaapModel) //
+ .verifyComplete();
+
+ verify(httpClientMock, times(2)).getBaseUri();
+ verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class));
+ verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any());
+ verifyNoMoreInteractions(httpClientMock);
+ }
- verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any(), eq(contextMap));
- verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
+ @Test
+ public void whenPassedObjectFits_firstFailsThenFails() throws Exception {
+ prepareMocksForTests(null, Integer.valueOf(HttpStatus.BAD_GATEWAY.value()),
+ Integer.valueOf((HttpStatus.BAD_GATEWAY.value())));
+
+ StepVerifier
+ .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
+ .expectErrorMessage("Retries exhausted: 1/1") //
+ .verify();
+
+ verify(httpClientMock, times(2)).getBaseUri();
+ verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class));
+ verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any());
+ verifyNoMoreInteractions(httpClientMock);
}
@SafeVarargs
- final void prepareMocksForTests(Mono<HttpStatus> firstResponse, Mono<HttpStatus>... nextHttpResponses) {
- dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class);
- when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any(), any())).thenReturn(firstResponse,
- nextHttpResponses);
-
- dmaapPublisherTask = spy(new DataRouterPublisher(appConfig));
- when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration);
- doReturn(dMaaPProducerReactiveHttpClient).when(dmaapPublisherTask).resolveClient();
+ final void prepareMocksForTests(Exception exception, Integer firstResponse, Integer... nextHttpResponses)
+ throws Exception {
+ httpClientMock = mock(DmaapProducerReactiveHttpClient.class);
+ when(appConfig.getDmaapPublisherConfiguration()).thenReturn(publisherConfigurationMock);
+ doReturn(publisherConfigurationMock).when(publisherTaskUnderTestSpy).resolveConfiguration();
+ doReturn(httpClientMock).when(publisherTaskUnderTestSpy).resolveClient();
+
+ UriBuilder uriBuilder = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT);
+ when(httpClientMock.getBaseUri()).thenReturn(uriBuilder);
+
+ HttpResponse httpResponseMock = mock(HttpResponse.class);
+ if (exception == null) {
+ when(httpClientMock.getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any()))
+ .thenReturn(httpResponseMock);
+ } else {
+ when(httpClientMock.getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any()))
+ .thenThrow(exception).thenReturn(httpResponseMock);
+ }
+ StatusLine statusLineMock = mock(StatusLine.class);
+ when(httpResponseMock.getStatusLine()).thenReturn(statusLineMock);
+ when(statusLineMock.getStatusCode()).thenReturn(firstResponse, nextHttpResponses);
+
+ InputStream fileStream = new ByteArrayInputStream(FILE_CONTENT.getBytes());
+ doReturn(fileStream).when(publisherTaskUnderTestSpy).createInputStream(Paths.get("target", LOCAL_FILE_NAME));
+ }
+
+ private Map<String, String> getMetaDataAsMap(Header[] metaHeaders) {
+ Map<String, String> metaHash = new HashMap<>();
+ String actualMetaData = metaHeaders[0].getValue();
+ actualMetaData = actualMetaData.substring(1, actualMetaData.length() - 1);
+ actualMetaData = actualMetaData.replace("\"", "");
+ String[] commaSplitedMetaData = actualMetaData.split(",");
+ for (int i = 0; i < commaSplitedMetaData.length; i++) {
+ String[] keyValuePair = commaSplitedMetaData[i].split(":");
+ if (keyValuePair.length > 2) {
+ List<String> arrayKeyValuePair = new ArrayList<>(keyValuePair.length);
+ for (int j = 1; j < keyValuePair.length; j++) {
+ arrayKeyValuePair.add(keyValuePair[j]);
+ }
+ keyValuePair[1] = String.join(":", arrayKeyValuePair);
+ }
+ metaHash.put(keyValuePair[0], keyValuePair[1]);
+ }
+ return metaHash;
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
index c266d50e..6e17f27b 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -67,7 +67,7 @@ public class FileCollectorTest {
private static final int PORT_22 = 22;
private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME;
- private static final Path LOCAL_FILE_LOCATION = FileData.createLocalFileName(SERVER_ADDRESS, PM_FILE_NAME);
+ private static final Path LOCAL_FILE_LOCATION = FileData.createLocalFileName(SOURCE_NAME, PM_FILE_NAME);
private static final String USER = "usr";
private static final String PWD = "pwd";
private static final String FTPES_LOCATION =
@@ -93,6 +93,7 @@ public class FileCollectorTest {
private FtpsClient ftpsClientMock = mock(FtpsClient.class);
private SftpClient sftpClientMock = mock(SftpClient.class);
+ private final Map<String, String> contextMap = new HashMap<>();
private MessageMetaData createMessageMetaData() {
@@ -119,6 +120,7 @@ public class FileCollectorTest {
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
.scheme(scheme)
+ .messageMetaData(createMessageMetaData())
.build();
// @formatter:on
}
@@ -134,7 +136,7 @@ public class FileCollectorTest {
.timeZoneOffset(TIME_ZONE_OFFSET)
.name(PM_FILE_NAME)
.location(location)
- .internalLocation(LOCAL_FILE_LOCATION.toString())
+ .internalLocation(LOCAL_FILE_LOCATION)
.compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
@@ -160,10 +162,11 @@ public class FileCollectorTest {
ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT);
- Map<String, String> contextMap = new HashMap<>();
- StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+ StepVerifier.create(
+ collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0), contextMap))
.expectNext(expectedConsumerDmaapModel).verifyComplete();
+ verify(ftpsClientMock, times(1)).open();
verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
verify(ftpsClientMock, times(1)).close();
@@ -175,11 +178,13 @@ public class FileCollectorTest {
FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock));
doReturn(sftpClientMock).when(collectorUndetTest).createSftpClient(any());
+
FileData fileData = createFileData(SFTP_LOCATION_NO_PORT, Scheme.SFTP);
ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(SFTP_LOCATION_NO_PORT);
- Map<String, String> contextMap = new HashMap<>();
- StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+ StepVerifier
+ .create(collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0),
+ contextMap))
.expectNext(expectedConsumerDmaapModel) //
.verifyComplete();
@@ -187,10 +192,13 @@ public class FileCollectorTest {
fileData = createFileData(SFTP_LOCATION, Scheme.SFTP);
expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(SFTP_LOCATION);
- StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+ StepVerifier
+ .create(collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0),
+ contextMap))
.expectNext(expectedConsumerDmaapModel) //
.verifyComplete();
+ verify(sftpClientMock, times(2)).open();
verify(sftpClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
verify(sftpClientMock, times(2)).close();
verifyNoMoreInteractions(sftpClientMock);
@@ -205,8 +213,8 @@ public class FileCollectorTest {
doThrow(new DatafileTaskException("Unable to collect file.")).when(ftpsClientMock)
.collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- Map<String, String> contextMap = new HashMap<>();
- StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+ StepVerifier.create(
+ collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0), contextMap))
.expectErrorMessage("Retries exhausted: 3/3").verify();
verify(ftpsClientMock, times(4)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
@@ -222,11 +230,11 @@ public class FileCollectorTest {
ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT);
FileData fileData = createFileData(FTPES_LOCATION_NO_PORT, Scheme.FTPS);
- Map<String, String> contextMap = new HashMap<>();
- StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+
+ StepVerifier.create(
+ collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0), contextMap))
.expectNext(expectedConsumerDmaapModel).verifyComplete();
verify(ftpsClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
}
-
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java
new file mode 100644
index 00000000..3e3c2ed6
--- /dev/null
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java
@@ -0,0 +1,175 @@
+/*-
+* ============LICENSE_START=======================================================
+* Copyright (C) 2019 Nordix Foundation.
+* ================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+* SPDX-License-Identifier: Apache-2.0
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
+import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.springframework.web.util.DefaultUriBuilderFactory;
+import org.springframework.web.util.UriBuilder;
+
+/**
+ * @author <a href="mailto:maxime.bonneau@est.tech">Maxime Bonneau</a>
+ *
+ */
+public class PublishedCheckerTest {
+ private static final String EMPTY_CONTENT = "[]";
+ private static final String FEEDLOG_TOPIC = "feedlog";
+ private static final String FEED_ID = "1";
+ private static final String HTTPS_SCHEME = "https";
+ private static final String HOST = "54.45.33.2";
+ private static final int PORT = 1234;
+ private static final String SOURCE_NAME = "oteNB5309";
+ private static final String FILE_NAME = "A20161224.1030-1045.bin.gz";
+ private static final String LOCAL_FILE_NAME = SOURCE_NAME + "_" + FILE_NAME;
+
+ private static final Map<String, String> CONTEXT_MAP = new HashMap<>();
+
+ private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
+ private static AppConfig appConfigMock;
+ private DmaapProducerReactiveHttpClient httpClientMock = mock(DmaapProducerReactiveHttpClient.class);
+
+ private PublishedChecker publishedCheckerUnderTestSpy;
+
+ /**
+ * Sets up data for the tests.
+ */
+ @BeforeAll
+ public static void setUp() {
+ when(publisherConfigurationMock.dmaapHostName()).thenReturn(HOST);
+ when(publisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME);
+ when(publisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT);
+
+ appConfigMock = mock(AppConfig.class);
+ when(appConfigMock.getDmaapPublisherConfiguration()).thenReturn(publisherConfigurationMock);
+ }
+
+ @Test
+ public void executeWhenNotPublished_returnsFalse() throws Exception {
+ prepareMocksForTests(HttpUtils.SC_OK, EMPTY_CONTENT, null);
+
+ boolean isPublished = publishedCheckerUnderTestSpy.execute(LOCAL_FILE_NAME, CONTEXT_MAP);
+
+ assertFalse(isPublished);
+
+ ArgumentCaptor<HttpUriRequest> requestCaptor = ArgumentCaptor.forClass(HttpUriRequest.class);
+ verify(httpClientMock).getBaseUri();
+ verify(httpClientMock).addUserCredentialsToHead(any(HttpUriRequest.class));
+ verify(httpClientMock).getDmaapProducerResponseWithCustomTimeout(requestCaptor.capture(), anyInt(), any());
+ verifyNoMoreInteractions(httpClientMock);
+
+ HttpUriRequest getRequest = requestCaptor.getValue();
+ assertTrue(getRequest instanceof HttpGet);
+ URI actualUri = getRequest.getURI();
+ assertEquals(HTTPS_SCHEME, actualUri.getScheme());
+ assertEquals(HOST, actualUri.getHost());
+ assertEquals(PORT, actualUri.getPort());
+ Path actualPath = Paths.get(actualUri.getPath());
+ assertTrue(FEEDLOG_TOPIC.equals(actualPath.getName(0).toString()));
+ assertTrue(FEED_ID.equals(actualPath.getName(1).toString()));
+ String actualQuery = actualUri.getQuery();
+ assertTrue(actualQuery.contains("type=pub"));
+ assertTrue(actualQuery.contains("filename=" + LOCAL_FILE_NAME));
+ }
+
+ @Test
+ public void executeWhenDataRouterReturnsNok_returnsFalse() throws Exception {
+ prepareMocksForTests(HttpUtils.SC_BAD_REQUEST, EMPTY_CONTENT, null);
+
+ boolean isPublished = publishedCheckerUnderTestSpy.execute(LOCAL_FILE_NAME, CONTEXT_MAP);
+
+ assertFalse(isPublished);
+ }
+
+ @Test
+ public void executeWhenPublished_returnsTrue() throws Exception {
+ prepareMocksForTests(HttpUtils.SC_OK, "[" + LOCAL_FILE_NAME + "]", null);
+
+ boolean isPublished = publishedCheckerUnderTestSpy.execute(LOCAL_FILE_NAME, CONTEXT_MAP);
+
+ assertTrue(isPublished);
+ }
+
+ @Test
+ public void executeWhenErrorInDataRouter_returnsFalse() throws Exception {
+ prepareMocksForTests(HttpUtils.SC_OK, EMPTY_CONTENT, new DatafileTaskException(""));
+
+ boolean isPublished = publishedCheckerUnderTestSpy.execute(LOCAL_FILE_NAME, CONTEXT_MAP);
+
+ assertFalse(isPublished);
+ }
+
+ final void prepareMocksForTests(int responseCode, String content, Exception exception) throws Exception {
+ publishedCheckerUnderTestSpy = spy(new PublishedChecker(appConfigMock));
+
+ doReturn(publisherConfigurationMock).when(publishedCheckerUnderTestSpy).resolveConfiguration();
+ doReturn(httpClientMock).when(publishedCheckerUnderTestSpy).resolveClient();
+
+ UriBuilder uriBuilder = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT);
+ when(httpClientMock.getBaseUri()).thenReturn(uriBuilder);
+
+ HttpResponse httpResponseMock = mock(HttpResponse.class);
+ if (exception == null) {
+ when(httpClientMock.getDmaapProducerResponseWithCustomTimeout(any(HttpUriRequest.class), anyInt(), any()))
+ .thenReturn(httpResponseMock);
+ } else {
+ when(httpClientMock.getDmaapProducerResponseWithCustomTimeout(any(HttpUriRequest.class), anyInt(), any()))
+ .thenThrow(exception);
+ }
+ HttpEntity httpEntityMock = mock(HttpEntity.class);
+ StatusLine statusLineMock = mock(StatusLine.class);
+ when(httpResponseMock.getStatusLine()).thenReturn(statusLineMock);
+ when(statusLineMock.getStatusCode()).thenReturn(responseCode);
+ when(httpResponseMock.getEntity()).thenReturn(httpEntityMock);
+ InputStream stream = new ByteArrayInputStream(content.getBytes());
+ when(httpEntityMock.getContent()).thenReturn(stream);
+ }
+}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
index 8c4b3891..f6beae02 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
@@ -19,6 +19,7 @@ package org.onap.dcaegen2.collectors.datafile.tasks;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.notNull;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -26,9 +27,14 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import java.nio.file.Paths;
import java.time.Duration;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -43,6 +49,7 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@@ -56,8 +63,10 @@ public class ScheduledTasksTest {
private int uniqueValue = 0;
private DMaaPMessageConsumerTask consumerMock;
+ private PublishedChecker publishedCheckerMock;
private FileCollector fileCollectorMock;
private DataRouterPublisher dataRouterMock;
+ private Map<String, String> contextMap = new HashMap<String, String>();
@BeforeEach
private void setUp() {
@@ -75,13 +84,15 @@ public class ScheduledTasksTest {
.keyStorePasswordPath("keyStorePasswordPath") //
.enableDmaapCertAuth(true) //
.build(); //
+ doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
consumerMock = mock(DMaaPMessageConsumerTask.class);
+ publishedCheckerMock = mock(PublishedChecker.class);
fileCollectorMock = mock(FileCollector.class);
dataRouterMock = mock(DataRouterPublisher.class);
- doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
doReturn(consumerMock).when(testedObject).createConsumerTask();
+ doReturn(publishedCheckerMock).when(testedObject).createPublishedChecker();
doReturn(fileCollectorMock).when(testedObject).createFileCollector();
doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
}
@@ -107,6 +118,7 @@ public class ScheduledTasksTest {
.location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
.scheme(Scheme.FTPS) //
.compression("") //
+ .messageMetaData(messageMetaData())
.build();
}
@@ -122,9 +134,7 @@ public class ScheduledTasksTest {
}
private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) {
- MessageMetaData md = messageMetaData();
- return ImmutableFileReadyMessage.builder().pnfName(md.sourceName()).messageMetaData(md)
- .files(files(numberOfFiles, uniqueNames)).build();
+ return ImmutableFileReadyMessage.builder().files(files(numberOfFiles, uniqueNames)).build();
}
private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) {
@@ -146,7 +156,7 @@ public class ScheduledTasksTest {
.timeZoneOffset("") //
.name("") //
.location("") //
- .internalLocation("internalLocation") //
+ .internalLocation(Paths.get("internalLocation")) //
.compression("") //
.fileFormatType("") //
.fileFormatVersion("") //
@@ -174,18 +184,20 @@ public class ScheduledTasksTest {
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
doReturn(fileReadyMessages).when(consumerMock).execute();
+ doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
+
Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
- doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), anyLong(), notNull(), notNull());
doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
- StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
+ StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
.expectNextCount(noOfFiles) //
.expectComplete() //
.verify(); //
assertEquals(0, testedObject.getCurrentNumberOfTasks());
verify(consumerMock, times(1)).execute();
- verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull(), notNull());
verify(dataRouterMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull(), any());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
@@ -197,25 +209,27 @@ public class ScheduledTasksTest {
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
doReturn(fileReadyMessages).when(consumerMock).execute();
+ doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
+
Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
Mono<Object> error = Mono.error(new Exception("problem"));
// First file collect will fail, 3 will succeed
doReturn(error, collectedFile, collectedFile, collectedFile) //
.when(fileCollectorMock) //
- .execute(any(FileData.class), any(MessageMetaData.class), anyLong(), any(Duration.class), any());
+ .execute(any(FileData.class), anyLong(), any(Duration.class), notNull());
doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
- StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
+ StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
.expectNextCount(3) //
.expectComplete() //
.verify(); //
assertEquals(0, testedObject.getCurrentNumberOfTasks());
verify(consumerMock, times(1)).execute();
- verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ verify(fileCollectorMock, times(4)).execute(notNull(), anyLong(), notNull(), notNull());
verify(dataRouterMock, times(3)).execute(notNull(), anyLong(), notNull(), any());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
@@ -228,8 +242,10 @@ public class ScheduledTasksTest {
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
doReturn(fileReadyMessages).when(consumerMock).execute();
+ doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
+
Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
- doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), anyLong(), notNull(), notNull());
Mono<Object> error = Mono.error(new Exception("problem"));
// One publish will fail, the rest will succeed
@@ -237,14 +253,14 @@ public class ScheduledTasksTest {
.when(dataRouterMock) //
.execute(notNull(), anyLong(), notNull(), any());
- StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
+ StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
.expectNextCount(3) // 3 completed files
.expectComplete() //
.verify(); //
assertEquals(0, testedObject.getCurrentNumberOfTasks());
verify(consumerMock, times(1)).execute();
- verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ verify(fileCollectorMock, times(4)).execute(notNull(), anyLong(), notNull(), notNull());
verify(dataRouterMock, times(4)).execute(notNull(), anyLong(), notNull(), any());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
@@ -260,18 +276,20 @@ public class ScheduledTasksTest {
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
doReturn(fileReadyMessages).when(consumerMock).execute();
+ doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
+
Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
- doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), anyLong(), notNull(), notNull());
doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
- StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
+ StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
.expectNextCount(1) // 99 is skipped
.expectComplete() //
.verify(); //
assertEquals(0, testedObject.getCurrentNumberOfTasks());
verify(consumerMock, times(1)).execute();
- verify(fileCollectorMock, times(1)).execute(notNull(), notNull(), anyLong(), notNull(), any());
+ verify(fileCollectorMock, times(1)).execute(notNull(), anyLong(), notNull(), notNull());
verify(dataRouterMock, times(1)).execute(notNull(), anyLong(), notNull(), any());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
index 0eaa7a17..5e08efc7 100644
--- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
@@ -30,6 +30,6 @@ public class DatafileTaskException extends Exception {
}
public DatafileTaskException(String message, Exception e) {
- super(message + e);
+ super(message, e);
}
}
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
index 9f3a3188..f115dba7 100644
--- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
@@ -20,15 +20,44 @@ package org.onap.dcaegen2.collectors.datafile.model;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+import java.lang.reflect.Type;
+import java.nio.file.Path;
+/**
+ * Helper class to serialize object.
+ */
public class CommonFunctions {
- private static Gson gson = new GsonBuilder().serializeNulls().create();
+ private static Gson gson =
+ new GsonBuilder().registerTypeHierarchyAdapter(Path.class, new PathConverter()).serializeNulls().create();
- private CommonFunctions() {}
+ private CommonFunctions() {
+ }
+ /**
+ * Serializes a <code>ConsumerDmaapModel</code>.
+ *
+ * @param consumerDmaapModel model to serialize.
+ *
+ * @return a string with the serialized model.
+ */
public static String createJsonBody(ConsumerDmaapModel consumerDmaapModel) {
return gson.toJson(consumerDmaapModel);
}
-} \ No newline at end of file
+
+ /**
+ * Json serializer that handles Path serializations, since <code>Path</code> does not implement the
+ * <code>Serializable</code> interface.
+ */
+ public static class PathConverter implements JsonSerializer<Path> {
+ @Override
+ public JsonElement serialize(Path path, Type type, JsonSerializationContext jsonSerializationContext) {
+ return new JsonPrimitive(path.toString());
+ }
+ }
+}
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java
index 972316bf..2337485a 100644
--- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java
@@ -17,6 +17,9 @@
package org.onap.dcaegen2.collectors.datafile.model;
import com.google.gson.annotations.SerializedName;
+
+import java.nio.file.Path;
+
import org.immutables.gson.Gson;
import org.immutables.value.Value;
import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel;
@@ -55,7 +58,7 @@ public interface ConsumerDmaapModel extends DmaapModel {
String getLocation();
@SerializedName("internalLocation")
- String getInternalLocation();
+ Path getInternalLocation();
@SerializedName("compression")
String getCompression();
diff --git a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
index cbc3e122..25f0dbfc 100644
--- a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
+++ b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
@@ -1,58 +1,51 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.collectors.datafile.model;
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
-import org.junit.jupiter.api.Test;
+import java.nio.file.Paths;
-class CommonFunctionsTest {
- // @formatter:off
- private ConsumerDmaapModel model = ImmutableConsumerDmaapModel.builder()
- .productName("NrRadio")
- .vendorName("Ericsson")
- .lastEpochMicrosec("8745745764578")
- .sourceName("oteNB5309")
- .startEpochMicrosec("8745745764578")
- .timeZoneOffset("UTC+05:00")
- .name("A20161224.1030-1045.bin.gz")
- .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1145.bin.gz")
- .internalLocation("target/A20161224.1030-1045.bin.gz")
- .compression("gzip")
- .fileFormatType("org.3GPP.32.435#measCollec")
- .fileFormatVersion("V10")
- .build();
+import org.junit.jupiter.api.Test;
- private static final String EXPECTED_RESULT =
- "{\"productName\":\"NrRadio\","
- + "\"vendorName\":\"Ericsson\","
- + "\"lastEpochMicrosec\":\"8745745764578\","
- + "\"sourceName\":\"oteNB5309\","
- + "\"startEpochMicrosec\":\"8745745764578\","
- + "\"timeZoneOffset\":\"UTC+05:00\","
- + "\"name\":\"A20161224.1030-1045.bin.gz\","
- + "\"location\":\"ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1145.bin.gz\","
- + "\"internalLocation\":\"target/A20161224.1030-1045.bin.gz\","
- + "\"compression\":\"gzip\","
- + "\"fileFormatType\":\"org.3GPP.32.435#measCollec\","
- + "\"fileFormatVersion\":\"V10\"}";
- // @formatter:on
+public class CommonFunctionsTest {
@Test
- void createJsonBody_shouldReturnJsonInString() {
- assertEquals(EXPECTED_RESULT, CommonFunctions.createJsonBody(model));
+ public void createJsonBody_success() {
+ ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel //
+ .builder() //
+ .productName("") //
+ .vendorName("") //
+ .lastEpochMicrosec("") //
+ .sourceName("") //
+ .startEpochMicrosec("") //
+ .timeZoneOffset("") //
+ .name("") //
+ .location("") //
+ .internalLocation(Paths.get("internalLocation")) //
+ .compression("") //
+ .fileFormatType("") //
+ .fileFormatVersion("") //
+ .build();
+ String actualBody = CommonFunctions.createJsonBody(consumerDmaapModel);
+
+ assertTrue(actualBody.contains("\"internalLocation\":\"internalLocation\""));
}
-} \ No newline at end of file
+}
diff --git a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java
index 2c5e701d..0c1ac436 100644
--- a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java
+++ b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java
@@ -16,6 +16,9 @@
package org.onap.dcaegen2.collectors.datafile.model;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -28,29 +31,27 @@ public class ConsumerDmaapModelTest {
private static final String TIME_ZONE_OFFSET = "UTC+05:00";
private static final String NAME = "A20161224.1030-1045.bin.gz";
private static final String LOCATION = "ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1145.bin.gz";
- private static final String INTERNAL_LOCATION = "target/A20161224.1030-1045.bin.gz";
+ private static final Path INTERNAL_LOCATION = Paths.get("target/A20161224.1030-1045.bin.gz");
private static final String COMPRESSION = "gzip";
private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
private static final String FILE_FORMAT_VERSION = "V10";
@Test
public void consumerDmaapModelBuilder_shouldBuildAnObject() {
- // @formatter:off
- ConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .name(NAME)
- .location(LOCATION)
- .internalLocation(INTERNAL_LOCATION)
- .compression(COMPRESSION)
- .fileFormatType(FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ ConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .name(NAME) //
+ .location(LOCATION) //
+ .internalLocation(INTERNAL_LOCATION) //
+ .compression(COMPRESSION) //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- // @formatter:on
Assertions.assertNotNull(consumerDmaapModel);
Assertions.assertEquals(PRODUCT_NAME, consumerDmaapModel.getProductName());
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
index bca7dfd2..de50f24a 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
@@ -25,4 +25,5 @@ import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
*/
public interface FileCollectClient extends AutoCloseable {
public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException;
+ public void open() throws DatafileTaskException;
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
index 1bf3ec5a..492768c2 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
@@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.core.io.FileSystemResource;
/**
- * Gets file from xNF with FTPS protocol.
+ * Gets file from PNF with FTPS protocol.
*
* @author <a href="mailto:martin.c.yan@est.tech">Martin Yan</a>
*/
@@ -53,12 +53,22 @@ public class FtpsClient implements FileCollectClient {
private final FileServerData fileServerData;
private static TrustManager theTrustManager = null;
- public FtpsClient(FileServerData fileServerData) {
+ private final String keyCertPath;
+ private final String keyCertPassword;
+ private final Path trustedCAPath;
+ private final String trustedCAPassword;
+
+ public FtpsClient(FileServerData fileServerData, String keyCertPath, String keyCertPassword, Path trustedCAPath,
+ String trustedCAPassword) {
this.fileServerData = fileServerData;
+ this.keyCertPath = keyCertPath;
+ this.keyCertPassword = keyCertPassword;
+ this.trustedCAPath = trustedCAPath;
+ this.trustedCAPassword = trustedCAPassword;
}
- public void open(String keyCertPath, String keyCertPassword, Path trustedCAPath, String trustedCAPassword)
- throws DatafileTaskException {
+ @Override
+ public void open() throws DatafileTaskException {
try {
realFtpsClient.setNeedClientAuth(true);
realFtpsClient.setKeyManager(createKeyManager(keyCertPath, keyCertPassword));
@@ -67,7 +77,7 @@ public class FtpsClient implements FileCollectClient {
} catch (DatafileTaskException e) {
throw e;
} catch (Exception e) {
- throw new DatafileTaskException("Could not open connection: ", e);
+ throw new DatafileTaskException("Could not open connection: " + e, e);
}
}
@@ -100,7 +110,7 @@ public class FtpsClient implements FileCollectClient {
throw new DatafileTaskException("Could not retrieve file " + remoteFileName);
}
} catch (IOException e) {
- throw new DatafileTaskException("Could not fetch file: ", e);
+ throw new DatafileTaskException("Could not fetch file: " + e, e);
}
logger.trace("collectFile fetched: {}", localFileName);
}
@@ -135,12 +145,24 @@ public class FtpsClient implements FileCollectClient {
logger.trace("setUpConnection successfully!");
}
- InputStream createInputStream(Path localFileName) throws IOException {
+ private TrustManager createTrustManager(Path trustedCAPath, String trustedCAPassword)
+ throws IOException, KeyStoreException, NoSuchAlgorithmException, CertificateException {
+ logger.trace("Creating trust manager from file: {}", trustedCAPath);
+ try (InputStream fis = createInputStream(trustedCAPath)) {
+ KeyStore keyStore = KeyStore.getInstance("JKS");
+ keyStore.load(fis, trustedCAPassword.toCharArray());
+ TrustManagerFactory factory = TrustManagerFactory.getInstance("SunX509");
+ factory.init(keyStore);
+ return factory.getTrustManagers()[0];
+ }
+ }
+
+ protected InputStream createInputStream(Path localFileName) throws IOException {
FileSystemResource realResource = new FileSystemResource(localFileName);
return realResource.getInputStream();
}
- OutputStream createOutputStream(Path localFileName) throws IOException {
+ protected OutputStream createOutputStream(Path localFileName) throws IOException {
File localFile = localFileName.toFile();
if (localFile.createNewFile()) {
logger.warn("Local file {} already created", localFileName);
@@ -150,18 +172,7 @@ public class FtpsClient implements FileCollectClient {
return output;
}
- private TrustManager createTrustManager(Path trustedCAPath, String trustedCAPassword)
- throws IOException, KeyStoreException, NoSuchAlgorithmException, CertificateException {
- try (InputStream fis = createInputStream(trustedCAPath)) {
- KeyStore keyStore = KeyStore.getInstance("JKS");
- keyStore.load(fis, trustedCAPassword.toCharArray());
- TrustManagerFactory factory = TrustManagerFactory.getInstance("SunX509");
- factory.init(keyStore);
- return factory.getTrustManagers()[0];
- }
- }
-
- TrustManager getTrustManager(Path trustedCAPath, String trustedCAPassword)
+ protected TrustManager getTrustManager(Path trustedCAPath, String trustedCAPassword)
throws KeyStoreException, NoSuchAlgorithmException, IOException, CertificateException {
synchronized (FtpsClient.class) {
if (theTrustManager == null) {
@@ -171,7 +182,7 @@ public class FtpsClient implements FileCollectClient {
}
}
- KeyManager createKeyManager(String keyCertPath, String keyCertPassword)
+ protected KeyManager createKeyManager(String keyCertPath, String keyCertPassword)
throws IOException, GeneralSecurityException {
return KeyManagerUtils.createClientKeyManager(new File(keyCertPath), keyCertPassword);
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
index 2f489166..4517a755 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
@@ -47,7 +47,7 @@ public class SftpClient implements FileCollectClient {
@Override
public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException {
- logger.trace("collectFile called");
+ logger.trace("collectFile {}", localFile);
try {
sftpChannel.get(remoteFile, localFile.toString());
@@ -61,7 +61,7 @@ public class SftpClient implements FileCollectClient {
@Override
public void close() {
- logger.trace("close");
+ logger.trace("closing sftp session");
if (sftpChannel != null) {
sftpChannel.exit();
sftpChannel = null;
@@ -72,6 +72,7 @@ public class SftpClient implements FileCollectClient {
}
}
+ @Override
public void open() throws DatafileTaskException {
try {
if (session == null) {
@@ -79,7 +80,7 @@ public class SftpClient implements FileCollectClient {
sftpChannel = getChannel(session);
}
} catch (JSchException e) {
- throw new DatafileTaskException("Could not open Sftp client", e);
+ throw new DatafileTaskException("Could not open Sftp client" + e, e);
}
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java
new file mode 100644
index 00000000..e01a941b
--- /dev/null
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java
@@ -0,0 +1,67 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.http;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+
+import org.apache.http.client.RedirectStrategy;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+
+/**
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+public class HttpAsyncClientBuilderWrapper implements IHttpAsyncClientBuilder {
+ HttpAsyncClientBuilder builder = HttpAsyncClients.custom();
+
+ @Override
+ public IHttpAsyncClientBuilder setRedirectStrategy(RedirectStrategy redirectStrategy) {
+ builder.setRedirectStrategy(redirectStrategy);
+ return this;
+ }
+
+ @Override
+ public IHttpAsyncClientBuilder setSSLContext(SSLContext sslcontext) {
+ builder.setSSLContext(sslcontext);
+ return this;
+ }
+
+ @Override
+ public IHttpAsyncClientBuilder setSSLHostnameVerifier(HostnameVerifier hostnameVerifier) {
+ builder.setSSLHostnameVerifier(hostnameVerifier);
+ return this;
+ }
+
+ @Override
+ public IHttpAsyncClientBuilder setDefaultRequestConfig(RequestConfig config) {
+ builder.setDefaultRequestConfig(config);
+ return this;
+ }
+
+ @Override
+ public CloseableHttpAsyncClient build() {
+ return builder.build();
+ }
+
+}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/IHttpAsyncClientBuilder.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/IHttpAsyncClientBuilder.java
new file mode 100644
index 00000000..e0a51a80
--- /dev/null
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/IHttpAsyncClientBuilder.java
@@ -0,0 +1,43 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.http;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+
+import org.apache.http.client.RedirectStrategy;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+
+/**
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+public interface IHttpAsyncClientBuilder {
+ public IHttpAsyncClientBuilder setRedirectStrategy(final RedirectStrategy redirectStrategy);
+
+ public IHttpAsyncClientBuilder setSSLContext(final SSLContext sslcontext);
+
+ public IHttpAsyncClientBuilder setSSLHostnameVerifier(final HostnameVerifier hostnameVerifier);
+
+ public IHttpAsyncClientBuilder setDefaultRequestConfig(final RequestConfig config);
+
+ public CloseableHttpAsyncClient build();
+}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java
deleted file mode 100644
index 5295b124..00000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.io;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Path;
-import org.springframework.core.io.FileSystemResource;
-
-public class FileSystemResourceWrapper implements IFileSystemResource {
- private FileSystemResource realResource;
-
- @Override
- public void setPath(Path path) {
- realResource = new FileSystemResource(path);
- }
- @Override
- public InputStream getInputStream() throws IOException {
- return realResource.getInputStream();
- }
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java
deleted file mode 100644
index 23f14a33..00000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.io;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Path;
-
-public interface IFileSystemResource {
-
- public void setPath(Path filePath);
-
- public InputStream getInputStream() throws IOException;
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
index 9304688f..944d3b34 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
@@ -16,54 +16,34 @@
package org.onap.dcaegen2.collectors.datafile.service.producer;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
-
-import com.google.gson.JsonElement;
-import com.google.gson.JsonParser;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
import java.nio.charset.StandardCharsets;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.Future;
import javax.net.ssl.SSLContext;
import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse;
-import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
-import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
-import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.ssl.SSLContextBuilder;
-import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper;
-import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
-import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.http.HttpAsyncClientBuilderWrapper;
+import org.onap.dcaegen2.collectors.datafile.http.IHttpAsyncClientBuilder;
import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpStatus;
import org.springframework.web.util.DefaultUriBuilderFactory;
-
-import reactor.core.publisher.Mono;
+import org.springframework.web.util.UriBuilder;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
@@ -71,25 +51,13 @@ import reactor.core.publisher.Mono;
*/
public class DmaapProducerReactiveHttpClient {
- private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
- private static final String NAME_JSON_TAG = "name";
- private static final String INTERNAL_LOCATION_JSON_TAG = "internalLocation";
- private static final String URI_SEPARATOR = "/";
- private static final String DEFAULT_FEED_ID = "1";
+ private static final int NO_REQUEST_TIMEOUT = -1;
private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
private static final Marker INVOKE_RETURN = MarkerFactory.getMarker("INVOKE_RETURN");
private final Logger logger = LoggerFactory.getLogger(this.getClass());
- private final String dmaapHostName;
- private final Integer dmaapPortNumber;
- private final String dmaapTopicName;
- private final String dmaapProtocol;
- private final String dmaapContentType;
- private final String user;
- private final String pwd;
-
- private IFileSystemResource fileResource = new FileSystemResourceWrapper();
+ private final DmaapPublisherConfiguration configuration;
/**
* Constructor DmaapProducerReactiveHttpClient.
@@ -97,98 +65,84 @@ public class DmaapProducerReactiveHttpClient {
* @param dmaapPublisherConfiguration - DMaaP producer configuration object
*/
public DmaapProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
- this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName();
- this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber();
- this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName();
- this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
- this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType();
- this.user = dmaapPublisherConfiguration.dmaapUserName();
- this.pwd = dmaapPublisherConfiguration.dmaapUserPassword();
+ this.configuration = dmaapPublisherConfiguration;
}
- /**
- * Function for calling DMaaP HTTP producer - post request to DMaaP DataRouter.
- *
- * @param consumerDmaapModel - object which will be sent to DMaaP DataRouter
- * @return status code of operation
- */
- public Mono<HttpStatus> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel,
- Map<String, String> contextMap) {
- MdcVariables.setMdcContextMap(contextMap);
- try (CloseableHttpAsyncClient webClient = createWebClient()) {
-
- HttpPut put = new HttpPut();
- prepareHead(consumerDmaapModel, put);
- prepareBody(consumerDmaapModel, put);
- addUserCredentialsToHead(put);
-
- logger.trace(INVOKE, "Starting to publish to DR {}", consumerDmaapModel.getInternalLocation());
- Future<HttpResponse> future = webClient.execute(put, null);
+ public HttpResponse getDmaapProducerResponseWithRedirect(HttpUriRequest request, Map<String, String> contextMap)
+ throws DatafileTaskException {
+ try (CloseableHttpAsyncClient webClient = createWebClient(true, NO_REQUEST_TIMEOUT)) {
+ MdcVariables.setMdcContextMap(contextMap);
+ webClient.start();
+
+ logger.trace(INVOKE, "Starting to produce to DR {}", request);
+ Future<HttpResponse> future = webClient.execute(request, null);
+ HttpResponse response = future.get();
+ logger.trace(INVOKE_RETURN, "Response from DR {}", response);
+ return response;
+ } catch (Exception e) {
+ throw new DatafileTaskException("Unable to create web client.", e);
+ }
+ }
+
+ public HttpResponse getDmaapProducerResponseWithCustomTimeout(HttpUriRequest request, int requestTimeout,
+ Map<String, String> contextMap) throws DatafileTaskException {
+ try (CloseableHttpAsyncClient webClient = createWebClient(false, requestTimeout)) {
+ MdcVariables.setMdcContextMap(contextMap);
+ webClient.start();
+
+ logger.trace(INVOKE, "Starting to produce to DR {}", request);
+ Future<HttpResponse> future = webClient.execute(request, null);
HttpResponse response = future.get();
logger.trace(INVOKE_RETURN, "Response from DR {}", response);
- return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
+ return response;
} catch (Exception e) {
- logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e);
- return Mono.error(e);
+ throw new DatafileTaskException("Unable to create web client.", e);
}
}
- private void addUserCredentialsToHead(HttpPut put) {
- String plainCreds = user + ":" + pwd;
+ public void addUserCredentialsToHead(HttpUriRequest request) {
+ String plainCreds = configuration.dmaapUserName() + ":" + configuration.dmaapUserPassword();
byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes);
String base64Creds = new String(base64CredsBytes);
logger.trace("base64Creds...: {}", base64Creds);
- put.addHeader("Authorization", "Basic " + base64Creds);
+ request.addHeader("Authorization", "Basic " + base64Creds);
}
- private void prepareHead(ConsumerDmaapModel model, HttpPut put) {
- put.addHeader(HttpHeaders.CONTENT_TYPE, dmaapContentType);
- JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
- String name = metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
- metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
- put.addHeader(X_DMAAP_DR_META, metaData.toString());
- put.setURI(getUri(name));
-
- String requestID = MDC.get(REQUEST_ID);
- put.addHeader(X_ONAP_REQUEST_ID, requestID);
- String invocationID = UUID.randomUUID().toString();
- put.addHeader(X_INVOCATION_ID, invocationID);
+ public UriBuilder getBaseUri() {
+ return new DefaultUriBuilderFactory().builder() //
+ .scheme(configuration.dmaapProtocol()) //
+ .host(configuration.dmaapHostName()) //
+ .port(configuration.dmaapPortNumber());
}
- private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException {
- Path fileLocation = Paths.get(model.getInternalLocation());
- this.fileResource.setPath(fileLocation);
- InputStream fileInputStream = fileResource.getInputStream();
-
- put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
+ private CloseableHttpAsyncClient createWebClient(boolean expectRedirect, int requestTimeout)
+ throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
+ SSLContext sslContext =
+ new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build();
- }
+ IHttpAsyncClientBuilder clientBuilder = getHttpClientBuilder();
+ clientBuilder.setSSLContext(sslContext) //
+ .setSSLHostnameVerifier(new NoopHostnameVerifier());
- private URI getUri(String fileName) {
- String path = dmaapTopicName + URI_SEPARATOR + DEFAULT_FEED_ID + URI_SEPARATOR + fileName;
- return new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber)
- .path(path).build();
- }
+ if (expectRedirect) {
+ clientBuilder.setRedirectStrategy(PublishRedirectStrategy.INSTANCE);
+ }
- void setFileSystemResource(IFileSystemResource fileSystemResource) {
- fileResource = fileSystemResource;
- }
+ if (requestTimeout > NO_REQUEST_TIMEOUT) {
+ RequestConfig requestConfig = RequestConfig.custom() //
+ .setSocketTimeout(requestTimeout) //
+ .setConnectTimeout(requestTimeout) //
+ .setConnectionRequestTimeout(requestTimeout) //
+ .build();
- protected CloseableHttpAsyncClient createWebClient()
- throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
+ clientBuilder.setDefaultRequestConfig(requestConfig);
+ }
- SSLContext sslContext = new SSLContextBuilder() //
- .loadTrustMaterial(null, (certificate, authType) -> true) //
- .build();
-
- CloseableHttpAsyncClient webClient = HttpAsyncClients.custom() //
- .setSSLContext(sslContext) //
- .setSSLHostnameVerifier(new NoopHostnameVerifier()) //
- .setRedirectStrategy(PublishRedirectStrategy.INSTANCE) //
- .build();
- webClient.start();
- return webClient;
+ return clientBuilder.build();
}
+ IHttpAsyncClientBuilder getHttpClientBuilder() {
+ return new HttpAsyncClientBuilderWrapper();
+ }
} \ No newline at end of file
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java
index e2882606..9e6c29f8 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java
@@ -73,7 +73,8 @@ public class FtpsClientTest {
@BeforeEach
protected void setUp() throws Exception {
- clientUnderTestSpy = spy(new FtpsClient(createFileServerData()));
+ clientUnderTestSpy = spy(new FtpsClient(createFileServerData(), FTP_KEY_PATH, FTP_KEY_PASSWORD, TRUSTED_CA_PATH,
+ TRUSTED_CA_PASSWORD));
clientUnderTestSpy.realFtpsClient = ftpsClientMock;
}
@@ -104,7 +105,7 @@ public class FtpsClientTest {
doReturn(true).when(ftpsClientMock).login(USERNAME, PASSWORD);
doReturn(HttpStatus.OK.value()).when(ftpsClientMock).getReplyCode();
- clientUnderTestSpy.open(FTP_KEY_PATH, FTP_KEY_PASSWORD, TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD);
+ clientUnderTestSpy.open();
doReturn(true).when(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, outputStreamMock);
clientUnderTestSpy.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH);
@@ -124,9 +125,8 @@ public class FtpsClientTest {
public void collectFileFaultyOwnKey_shouldFail() throws Exception {
doReturn(outputStreamMock).when(clientUnderTestSpy).createOutputStream(LOCAL_FILE_PATH);
- assertThatThrownBy(() -> clientUnderTestSpy.open(FTP_KEY_PATH, FTP_KEY_PASSWORD, TRUSTED_CA_PATH,
- TRUSTED_CA_PASSWORD)).hasMessageContaining(
- "Could not open connection: java.io.FileNotFoundException:");
+ assertThatThrownBy(() -> clientUnderTestSpy.open())
+ .hasMessageContaining("Could not open connection: java.io.FileNotFoundException:");
verify(ftpsClientMock).setNeedClientAuth(true);
@@ -142,9 +142,8 @@ public class FtpsClientTest {
doReturn(keyManagerMock).when(clientUnderTestSpy).createKeyManager(FTP_KEY_PATH, FTP_KEY_PASSWORD);
doThrow(new IOException("problem")).when(clientUnderTestSpy).createInputStream(TRUSTED_CA_PATH);
- assertThatThrownBy(
- () -> clientUnderTestSpy.open(FTP_KEY_PATH, FTP_KEY_PASSWORD, TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD))
- .hasMessage("Could not open connection: java.io.IOException: problem");
+ assertThatThrownBy(() -> clientUnderTestSpy.open())
+ .hasMessage("Could not open connection: java.io.IOException: problem");
}
@Test
@@ -153,9 +152,8 @@ public class FtpsClientTest {
doReturn(keyManagerMock).when(clientUnderTestSpy).createKeyManager(FTP_KEY_PATH, FTP_KEY_PASSWORD);
doReturn(inputStreamMock).when(clientUnderTestSpy).createInputStream(TRUSTED_CA_PATH);
- assertThatThrownBy(
- () -> clientUnderTestSpy.open(FTP_KEY_PATH, FTP_KEY_PASSWORD, TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD))
- .hasMessage("Could not open connection: java.io.EOFException");
+ assertThatThrownBy(() -> clientUnderTestSpy.open())
+ .hasMessage("Could not open connection: java.io.EOFException");
}
@Test
@@ -166,9 +164,7 @@ public class FtpsClientTest {
doReturn(outputStreamMock).when(clientUnderTestSpy).createOutputStream(LOCAL_FILE_PATH);
doReturn(false).when(ftpsClientMock).login(USERNAME, PASSWORD);
- assertThatThrownBy(
- () -> clientUnderTestSpy.open(FTP_KEY_PATH, FTP_KEY_PASSWORD, TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD))
- .hasMessage("Unable to log in to xNF. 127.0.0.1");
+ assertThatThrownBy(() -> clientUnderTestSpy.open()).hasMessage("Unable to log in to xNF. 127.0.0.1");
verify(ftpsClientMock).setNeedClientAuth(true);
verify(ftpsClientMock).setKeyManager(keyManagerMock);
@@ -185,9 +181,8 @@ public class FtpsClientTest {
doReturn(true).when(ftpsClientMock).login(USERNAME, PASSWORD);
doReturn(503).when(ftpsClientMock).getReplyCode();
- assertThatThrownBy(
- () -> clientUnderTestSpy.open(FTP_KEY_PATH, FTP_KEY_PASSWORD, TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD))
- .hasMessage("Unable to connect to xNF. 127.0.0.1 xNF reply code: 503");
+ assertThatThrownBy(() -> clientUnderTestSpy.open())
+ .hasMessage("Unable to connect to xNF. 127.0.0.1 xNF reply code: 503");
verify(ftpsClientMock).setNeedClientAuth(true);
verify(ftpsClientMock).setKeyManager(keyManagerMock);
@@ -205,7 +200,7 @@ public class FtpsClientTest {
doReturn(outputStreamMock).when(clientUnderTestSpy).createOutputStream(LOCAL_FILE_PATH);
doReturn(true).when(ftpsClientMock).login(USERNAME, PASSWORD);
doReturn(HttpStatus.OK.value()).when(ftpsClientMock).getReplyCode();
- clientUnderTestSpy.open(FTP_KEY_PATH, FTP_KEY_PASSWORD, TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD);
+ clientUnderTestSpy.open();
doReturn(false).when(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, outputStreamMock);
@@ -224,7 +219,7 @@ public class FtpsClientTest {
doReturn(outputStreamMock).when(clientUnderTestSpy).createOutputStream(LOCAL_FILE_PATH);
doReturn(true).when(ftpsClientMock).login(USERNAME, PASSWORD);
doReturn(HttpStatus.OK.value()).when(ftpsClientMock).getReplyCode();
- clientUnderTestSpy.open(FTP_KEY_PATH, FTP_KEY_PASSWORD, TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD);
+ clientUnderTestSpy.open();
when(ftpsClientMock.isConnected()).thenReturn(false);
doThrow(new IOException("problem")).when(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, outputStreamMock);
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
index 06ff570c..91c4c334 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
@@ -16,51 +16,44 @@
package org.onap.dcaegen2.collectors.datafile.service.producer;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonParser;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
-import java.nio.file.Paths;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import javax.net.ssl.SSLContext;
+
import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.io.IOUtils;
+import org.apache.http.Header;
import org.apache.http.HttpResponse;
-import org.apache.http.StatusLine;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPut;
-import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
-import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
+import org.mockito.ArgumentCaptor;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.http.IHttpAsyncClientBuilder;
+import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpStatus;
-import org.springframework.web.util.DefaultUriBuilderFactory;
-
-import reactor.test.StepVerifier;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
@@ -68,32 +61,24 @@ import reactor.test.StepVerifier;
*/
class DmaapProducerReactiveHttpClientTest {
- private static final String FILE_NAME = "A20161224.1030-1045.bin.gz";
- private static final String INTERNAL_LOCATION_JSON_TAG = "internalLocation";
- private static final String NAME_JSON_TAG = "name";
- private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
-
private static final String HOST = "54.45.33.2";
private static final String HTTPS_SCHEME = "https";
private static final int PORT = 1234;
- private static final String APPLICATION_OCTET_STREAM_CONTENT_TYPE = "application/octet-stream";
- private static final String URI_SEPARATOR = "/";
- private static final String PUBLISH_TOPIC = "publish";
- private static final String DEFAULT_FEED_ID = "1";
- private static final String FILE_CONTENT = "Just a string.";
+ private static final String USER_NAME = "dradmin";
+ private static final int TWO_SECOND_TIMEOUT = 2000;
- private DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient;
+ private static final Map<String, String> CONTEXT_MAP = new HashMap<>();
+
+
+ private DmaapProducerReactiveHttpClient producerClientUnderTestSpy;
private DmaapPublisherConfiguration dmaapPublisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
- private ConsumerDmaapModel consumerDmaapModel;
- private IFileSystemResource fileSystemResourceMock = mock(IFileSystemResource.class);
+ private IHttpAsyncClientBuilder clientBuilderMock;
+
private CloseableHttpAsyncClient clientMock;
- private HttpResponse responseMock = mock(HttpResponse.class);
@SuppressWarnings("unchecked")
private Future<HttpResponse> futureMock = mock(Future.class);
- private StatusLine statusLine = mock(StatusLine.class);
- private InputStream fileStream;
@BeforeEach
void setUp() throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
@@ -102,82 +87,114 @@ class DmaapProducerReactiveHttpClientTest {
when(dmaapPublisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT);
when(dmaapPublisherConfigurationMock.dmaapUserName()).thenReturn("dradmin");
when(dmaapPublisherConfigurationMock.dmaapUserPassword()).thenReturn("dradmin");
- when(dmaapPublisherConfigurationMock.dmaapContentType()).thenReturn(APPLICATION_OCTET_STREAM_CONTENT_TYPE);
- when(dmaapPublisherConfigurationMock.dmaapTopicName()).thenReturn(PUBLISH_TOPIC);
-
- // @formatter:off
- consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
- .productName("NrRadio")
- .vendorName("Ericsson")
- .lastEpochMicrosec("8745745764578")
- .sourceName("oteNB5309")
- .startEpochMicrosec("8745745764578")
- .timeZoneOffset("UTC+05:00")
- .name("A20161224.1030-1045.bin.gz")
- .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1145.bin.gz")
- .internalLocation("target/A20161224.1030-1045.bin.gz")
- .compression("gzip")
- .fileFormatType("org.3GPP.32.435#measCollec")
- .fileFormatVersion("V10")
- .build();
- //formatter:on
-
- dmaapProducerReactiveHttpClient = spy(new DmaapProducerReactiveHttpClient(dmaapPublisherConfigurationMock));
- dmaapProducerReactiveHttpClient.setFileSystemResource(fileSystemResourceMock);
+
+ producerClientUnderTestSpy = spy(new DmaapProducerReactiveHttpClient(dmaapPublisherConfigurationMock));
+
+ clientBuilderMock = mock(IHttpAsyncClientBuilder.class);
clientMock = mock(CloseableHttpAsyncClient.class);
- doReturn(clientMock).when(dmaapProducerReactiveHttpClient).createWebClient();
}
@Test
- void getHttpResponse_Success() throws Exception {
- mockWebClientDependantObject();
+ void getHttpResponseWithRederict_Success() throws Exception {
+ doReturn(clientBuilderMock).when(producerClientUnderTestSpy).getHttpClientBuilder();
+ when(clientBuilderMock.setSSLContext(any(SSLContext.class))).thenReturn(clientBuilderMock);
+ when(clientBuilderMock.setSSLHostnameVerifier(any(NoopHostnameVerifier.class))).thenReturn(clientBuilderMock);
+ when(clientBuilderMock.build()).thenReturn(clientMock);
+ when(clientMock.execute(any(HttpUriRequest.class), any())).thenReturn(futureMock);
+ HttpResponse responseMock = mock(HttpResponse.class);
+ when(futureMock.get()).thenReturn(responseMock);
- HttpPut httpPut = new HttpPut();
- httpPut.addHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_OCTET_STREAM_CONTENT_TYPE);
+ HttpGet request = new HttpGet();
+ producerClientUnderTestSpy.getDmaapProducerResponseWithRedirect(request, CONTEXT_MAP);
- URI expectedUri = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT)
- .path(PUBLISH_TOPIC + URI_SEPARATOR + DEFAULT_FEED_ID + URI_SEPARATOR + FILE_NAME).build();
- httpPut.setURI(expectedUri);
+ verify(clientBuilderMock).setSSLContext(any(SSLContext.class));
+ verify(clientBuilderMock).setSSLHostnameVerifier(any(NoopHostnameVerifier.class));
+ verify(clientBuilderMock).setRedirectStrategy(PublishRedirectStrategy.INSTANCE);
+ verify(clientBuilderMock).build();
+ verifyNoMoreInteractions(clientBuilderMock);
- JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(consumerDmaapModel));
- metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
- metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
- httpPut.addHeader(X_DMAAP_DR_META, metaData.toString());
+ verify(clientMock).start();
+ verify(clientMock).close();
- String plainCreds = "dradmin" + ":" + "dradmin";
- byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
- byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes);
- String base64Creds = new String(base64CredsBytes);
- httpPut.addHeader("Authorization", "Basic " + base64Creds);
+ verify(futureMock).get();
+ verifyNoMoreInteractions(futureMock);
+ }
- fileStream.reset();
- Map<String, String> contextMap = new HashMap<>();
- StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel, contextMap))
- .expectNext(HttpStatus.OK).verifyComplete();
+ @Test
+ void getHttpResponseWithCustomTimeout_Success() throws Exception {
+ doReturn(clientBuilderMock).when(producerClientUnderTestSpy).getHttpClientBuilder();
+ when(clientBuilderMock.setSSLContext(any(SSLContext.class))).thenReturn(clientBuilderMock);
+ when(clientBuilderMock.setDefaultRequestConfig(any(RequestConfig.class))).thenReturn(clientBuilderMock);
+ when(clientBuilderMock.build()).thenReturn(clientMock);
+ when(clientMock.execute(any(HttpUriRequest.class), any())).thenReturn(futureMock);
+ HttpResponse responseMock = mock(HttpResponse.class);
+ when(futureMock.get()).thenReturn(responseMock);
- verify(fileSystemResourceMock).setPath(Paths.get("target/" + FILE_NAME));
- InputStream fileInputStream = fileSystemResourceMock.getInputStream();
- httpPut.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
+ HttpGet request = new HttpGet();
+ producerClientUnderTestSpy.getDmaapProducerResponseWithCustomTimeout(request, TWO_SECOND_TIMEOUT, CONTEXT_MAP);
+
+ ArgumentCaptor<RequestConfig> requestConfigCaptor = ArgumentCaptor.forClass(RequestConfig.class);
+ verify(clientBuilderMock).setSSLContext(any(SSLContext.class));
+ verify(clientBuilderMock).setSSLHostnameVerifier(any(NoopHostnameVerifier.class));
+ verify(clientBuilderMock).setDefaultRequestConfig(requestConfigCaptor.capture());
+ RequestConfig requestConfig = requestConfigCaptor.getValue();
+ assertEquals(TWO_SECOND_TIMEOUT, requestConfig.getSocketTimeout());
+ assertEquals(TWO_SECOND_TIMEOUT, requestConfig.getConnectTimeout());
+ assertEquals(TWO_SECOND_TIMEOUT, requestConfig.getConnectionRequestTimeout());
+ verify(clientBuilderMock).build();
+ verifyNoMoreInteractions(clientBuilderMock);
+
+ verify(clientMock).start();
+ verify(clientMock).close();
+
+ verify(futureMock).get();
+ verifyNoMoreInteractions(futureMock);
}
@Test
- void getHttpResponse_Fail() throws Exception {
- Map<String, String> contextMap = new HashMap<>();
- doReturn(futureMock).when(clientMock).execute(any(), any());
- doThrow(new InterruptedException()).when(futureMock).get();
- StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel, contextMap)) //
- .expectError() //
- .verify(); //
+ public void getResponseWithException_throwsException() throws Exception {
+ doReturn(clientBuilderMock).when(producerClientUnderTestSpy).getHttpClientBuilder();
+ when(clientBuilderMock.setDefaultRequestConfig(any(RequestConfig.class))).thenReturn(clientBuilderMock);
+ when(clientBuilderMock.setSSLContext(any(SSLContext.class))).thenReturn(clientBuilderMock);
+ when(clientBuilderMock.build()).thenReturn(clientMock);
+ HttpPut request = new HttpPut();
+ when(clientMock.execute(any(HttpPut.class), any())).thenReturn(futureMock);
+
+ try {
+ when(futureMock.get()).thenThrow(new InterruptedException("Interrupted"));
+
+ producerClientUnderTestSpy.getDmaapProducerResponseWithCustomTimeout(request, TWO_SECOND_TIMEOUT,
+ CONTEXT_MAP);
+
+ fail("Should have got an exception.");
+ } catch (DatafileTaskException e) {
+ assertTrue(e.getCause() instanceof InterruptedException);
+ assertEquals("Interrupted", e.getCause().getMessage());
+ } catch (Exception e) {
+ fail("Wrong exception");
+ }
+
+ verify(clientMock).start();
+ verify(clientMock).close();
}
- private void mockWebClientDependantObject()
- throws IOException, InterruptedException, ExecutionException {
- fileStream = new ByteArrayInputStream(FILE_CONTENT.getBytes());
- when(fileSystemResourceMock.getInputStream()).thenReturn(fileStream);
- when(clientMock.execute(any(HttpPut.class), any())).thenReturn(futureMock);
- when(futureMock.get()).thenReturn(responseMock);
- when(responseMock.getStatusLine()).thenReturn(statusLine);
- when(statusLine.getStatusCode()).thenReturn(HttpUtils.SC_OK);
+ @Test
+ public void addCredentialsToHead_success() {
+ HttpPut request = new HttpPut();
+
+ producerClientUnderTestSpy.addUserCredentialsToHead(request);
+ String plainCreds = USER_NAME + ":" + USER_NAME;
+ byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
+ byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes);
+ String base64Creds = "Basic " + new String(base64CredsBytes);
+ Header[] authorizationHeaders = request.getHeaders("Authorization");
+ assertEquals(base64Creds, authorizationHeaders[0].getValue());
+ }
+
+ @Test
+ public void getBaseUri_success() {
+ URI uri = producerClientUnderTestSpy.getBaseUri().build();
+ assertEquals(HTTPS_SCHEME + "://" + HOST + ":" + PORT, uri.toString());
}
}