aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src
diff options
context:
space:
mode:
authorwasala <przemyslaw.wasala@nokia.com>2018-09-19 12:55:18 +0200
committerwasala <przemyslaw.wasala@nokia.com>2018-09-20 12:50:32 +0200
commit3468d474187ef01546bdf1180d11453a4f924d31 (patch)
tree79aafcd9b4f7d36d27d0412f0ac3946c0ad79034 /datafile-app-server/src
parent2d1ea513ca0386102f2f9c11bd5c76f37939113a (diff)
Loading configuration from consul/cbs
*Registered task which calling cbs/consul for configuration - fixedRate 5 minutes *Added workflow for loading config from cloud Change-Id: Iba36d18b4ee0dca082612fa4c92c877f71c9b1fe Issue-ID: DCAEGEN2-784 Signed-off-by: wasala <przemyslaw.wasala@nokia.com>
Diffstat (limited to 'datafile-app-server/src')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java65
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java99
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java83
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java17
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java29
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/EnvProperties.java41
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java (renamed from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/FileData.java)22
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DatafileConfigurationProvider.java102
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java47
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpGetClient.java91
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java12
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java10
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java104
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectorTest.java25
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DatafileConfigurationProviderTest.java90
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java3
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpGetClientTest.java75
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java4
20 files changed, 858 insertions, 65 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
new file mode 100644
index 00000000..7570d704
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
@@ -0,0 +1,65 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.configuration;
+
+import com.google.gson.JsonObject;
+import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapConsumerConfiguration;
+import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapPublisherConfiguration;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
+ */
+public class CloudConfigParser {
+
+ private final JsonObject jsonObject;
+
+ CloudConfigParser(JsonObject jsonObject) {
+ this.jsonObject = jsonObject;
+ }
+
+ DmaapPublisherConfiguration getDmaapPublisherConfig() {
+ return new ImmutableDmaapPublisherConfiguration.Builder()
+ .dmaapTopicName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapTopicName").getAsString())
+ .dmaapUserPassword(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserPassword").getAsString())
+ .dmaapPortNumber(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapPortNumber").getAsInt())
+ .dmaapProtocol(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapProtocol").getAsString())
+ .dmaapContentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString())
+ .dmaapHostName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapHostName").getAsString())
+ .dmaapUserName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserName").getAsString())
+ .build();
+ }
+
+ DmaapConsumerConfiguration getDmaapConsumerConfig() {
+ return new ImmutableDmaapConsumerConfiguration.Builder()
+ .timeoutMS(jsonObject.get("dmaap.dmaapConsumerConfiguration.timeoutMS").getAsInt())
+ .dmaapHostName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapHostName").getAsString())
+ .dmaapUserName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserName").getAsString())
+ .dmaapUserPassword(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserPassword").getAsString())
+ .dmaapTopicName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapTopicName").getAsString())
+ .dmaapPortNumber(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapPortNumber").getAsInt())
+ .dmaapContentType(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapContentType").getAsString())
+ .messageLimit(jsonObject.get("dmaap.dmaapConsumerConfiguration.messageLimit").getAsInt())
+ .dmaapProtocol(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapProtocol").getAsString())
+ .consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString())
+ .consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString())
+ .build();
+ }
+} \ No newline at end of file
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
new file mode 100644
index 00000000..7bf711bc
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
@@ -0,0 +1,99 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.configuration;
+
+import com.google.gson.JsonObject;
+import java.util.Optional;
+import java.util.Properties;
+import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.collectors.datafile.model.EnvProperties;
+import org.onap.dcaegen2.collectors.datafile.service.DatafileConfigurationProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Primary;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import reactor.core.publisher.Flux;
+import reactor.core.scheduler.Schedulers;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
+ */
+@Configuration
+@EnableConfigurationProperties
+@EnableScheduling
+@Primary
+public class CloudConfiguration extends AppConfig {
+
+ private static final Logger logger = LoggerFactory.getLogger(CloudConfiguration.class);
+
+ private DatafileConfigurationProvider datafileConfigurationProvider;
+ private DmaapPublisherConfiguration dmaapPublisherCloudConfiguration;
+ private DmaapConsumerConfiguration dmaapConsumerCloudConfiguration;
+
+ @Value("#{systemEnvironment}")
+ private Properties systemEnvironment;
+
+ @Autowired
+ public void setThreadPoolTaskScheduler(DatafileConfigurationProvider datafileConfigurationProvider) {
+ this.datafileConfigurationProvider = datafileConfigurationProvider;
+ }
+
+
+ protected void runTask() {
+ Flux.defer(() -> EnvironmentProcessor.evaluate(systemEnvironment))
+ .subscribeOn(Schedulers.parallel())
+ .subscribe(this::parsingConfigSuccess, this::parsingConfigError);
+ }
+
+ private void parsingConfigError(Throwable throwable) {
+ logger.warn("Error in case of processing system environment, more details below: ", throwable);
+ }
+
+ private void cloudConfigError(Throwable throwable) {
+ logger.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable);
+ }
+
+ private void parsingConfigSuccess(EnvProperties envProperties) {
+ logger.info("Fetching Datafile Collector configuration from ConfigBindingService/Consul");
+ datafileConfigurationProvider.callForDataFileCollectorConfiguration(envProperties)
+ .subscribe(this::parseCloudConfig, this::cloudConfigError);
+ }
+
+ private void parseCloudConfig(JsonObject jsonObject) {
+ logger.info("Received application configuration: {}", jsonObject);
+ CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject);
+ dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig();
+ dmaapConsumerCloudConfiguration = cloudConfigParser.getDmaapConsumerConfig();
+ }
+
+ @Override
+ public DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
+ return Optional.ofNullable(dmaapPublisherCloudConfiguration).orElse(super.getDmaapPublisherConfiguration());
+ }
+
+ @Override
+ public DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
+ return Optional.ofNullable(dmaapConsumerCloudConfiguration).orElse(super.getDmaapConsumerConfiguration());
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
new file mode 100644
index 00000000..9f6b2737
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
@@ -0,0 +1,83 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.configuration;
+
+import java.util.Optional;
+import java.util.Properties;
+import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException;
+import org.onap.dcaegen2.collectors.datafile.model.EnvProperties;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableEnvProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Mono;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
+ */
+class EnvironmentProcessor {
+
+ private static final int DEFAULT_CONSUL_PORT = 8500;
+ private static final Logger logger = LoggerFactory.getLogger(EnvironmentProcessor.class);
+
+ private EnvironmentProcessor() {
+ }
+
+ static Mono<EnvProperties> evaluate(Properties systemEnvironment) {
+ logger.info("Loading configuration from system environment variables");
+ EnvProperties envProperties;
+ try {
+ envProperties = ImmutableEnvProperties.builder().consulHost(getConsulHost(systemEnvironment))
+ .consulPort(getConsultPort(systemEnvironment)).cbsName(getConfigBindingService(systemEnvironment))
+ .appName(getService(systemEnvironment)).build();
+ } catch (EnvironmentLoaderException e) {
+ return Mono.error(e);
+ }
+ logger.info("Evaluated environment system variables {}", envProperties);
+ return Mono.just(envProperties);
+ }
+
+ private static String getConsulHost(Properties systemEnvironments) throws EnvironmentLoaderException {
+ return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_HOST"))
+ .orElseThrow(() -> new EnvironmentLoaderException("$CONSUL_HOST environment has not been defined"));
+ }
+
+ private static Integer getConsultPort(Properties systemEnvironments) {
+ return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_PORT")).map(Integer::valueOf)
+ .orElseGet(EnvironmentProcessor::getDefaultPortOfConsul);
+ }
+
+ private static String getConfigBindingService(Properties systemEnvironments) throws EnvironmentLoaderException {
+ return Optional.ofNullable(systemEnvironments.getProperty("CONFIG_BINDING_SERVICE"))
+ .orElseThrow(
+ () -> new EnvironmentLoaderException("$CONFIG_BINDING_SERVICE environment has not been defined"));
+ }
+
+ private static String getService(Properties systemEnvironments) throws EnvironmentLoaderException {
+ return Optional.ofNullable(Optional.ofNullable(systemEnvironments.getProperty("HOSTNAME"))
+ .orElse(systemEnvironments.getProperty("SERVICE_NAME")))
+ .orElseThrow(() -> new EnvironmentLoaderException(
+ "Neither $HOSTNAME/$SERVICE_NAME have not been defined as system environment"));
+ }
+
+ private static Integer getDefaultPortOfConsul() {
+ logger.warn("$CONSUL_PORT environment has not been defined");
+ logger.warn("$CONSUL_PORT variable will be set to default port {}", DEFAULT_CONSUL_PORT);
+ return DEFAULT_CONSUL_PORT;
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
index 1d0a192f..512a2178 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
@@ -16,6 +16,8 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
+import java.time.Duration;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
@@ -40,16 +42,20 @@ import reactor.core.publisher.Mono;
@EnableScheduling
public class SchedulerConfig extends DatafileAppConfig {
- private static final int SCHEDULING_DELAY = 2000;
+ private static final int SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = 10;
+ private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5;
private static volatile List<ScheduledFuture> scheduledFutureList = new ArrayList<ScheduledFuture>();
private final TaskScheduler taskScheduler;
private final ScheduledTasks scheduledTask;
+ private final CloudConfiguration cloudConfiguration;
@Autowired
- public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTask) {
+ public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTask,
+ CloudConfiguration cloudConfiguration) {
this.taskScheduler = taskScheduler;
this.scheduledTask = scheduledTask;
+ this.cloudConfiguration = cloudConfiguration;
}
/**
@@ -62,7 +68,7 @@ public class SchedulerConfig extends DatafileAppConfig {
scheduledFutureList.forEach(x -> x.cancel(false));
scheduledFutureList.clear();
return Mono.defer(() -> Mono
- .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED)));
+ .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED)));
}
/**
@@ -74,8 +80,11 @@ public class SchedulerConfig extends DatafileAppConfig {
@ApiOperation(value = "Start task if possible")
public synchronized boolean tryToStartTask() {
if (scheduledFutureList.isEmpty()) {
+ scheduledFutureList.add(taskScheduler
+ .scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(),
+ Duration.ofMinutes(SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY)));
scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(scheduledTask::scheduleMainDatafileEventTask,
- SCHEDULING_DELAY));
+ Duration.ofSeconds(SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS)));
return true;
} else {
return false;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java
new file mode 100644
index 00000000..75c2e8a8
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java
@@ -0,0 +1,29 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.exceptions;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
+ */
+public class EnvironmentLoaderException extends Exception {
+
+ public EnvironmentLoaderException(String message) {
+ super(message);
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java
index 0f03b1a4..1e2dcc91 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java
@@ -27,7 +27,7 @@ import java.util.List;
import org.apache.commons.io.FilenameUtils;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.service.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/EnvProperties.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/EnvProperties.java
new file mode 100644
index 00000000..86549343
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/EnvProperties.java
@@ -0,0 +1,41 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.model;
+
+import org.immutables.value.Value;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
+ */
+@Value.Immutable(prehash = true)
+public interface EnvProperties {
+
+ @Value.Parameter
+ String consulHost();
+
+ @Value.Parameter
+ Integer consulPort();
+
+ @Value.Parameter
+ String cbsName();
+
+ @Value.Parameter
+ String appName();
+
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
index 948976b6..48c4896f 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/FileData.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
@@ -16,22 +16,28 @@
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.service;
+package org.onap.dcaegen2.collectors.datafile.model;
+import org.immutables.gson.Gson;
import org.immutables.value.Value;
/**
* Contains data, from the fileReady event, about the file to collect from the xNF.
*
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- *
*/
@Value.Immutable
+@Gson.TypeAdapters
public interface FileData {
- public String changeIdentifier();
- public String changeType();
- public String location();
- public String compression();
- public String fileFormatType();
- public String fileFormatVersion();
+ String changeIdentifier();
+
+ String changeType();
+
+ String location();
+
+ String compression();
+
+ String fileFormatType();
+
+ String fileFormatVersion();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DatafileConfigurationProvider.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DatafileConfigurationProvider.java
new file mode 100644
index 00000000..aca87e5b
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DatafileConfigurationProvider.java
@@ -0,0 +1,102 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import org.onap.dcaegen2.collectors.datafile.model.EnvProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+import org.springframework.web.util.DefaultUriBuilderFactory;
+import reactor.core.publisher.Mono;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
+ */
+@Service
+public class DatafileConfigurationProvider {
+
+ private static final Logger logger = LoggerFactory.getLogger(DatafileConfigurationProvider.class);
+
+ private final HttpGetClient httpGetClient;
+
+ DatafileConfigurationProvider() {
+ this(new HttpGetClient());
+ }
+
+ DatafileConfigurationProvider(HttpGetClient httpGetClient) {
+ this.httpGetClient = httpGetClient;
+ }
+
+ public Mono<JsonObject> callForDataFileCollectorConfiguration(EnvProperties envProperties) {
+ return callConsulForConfigBindingServiceEndpoint(envProperties)
+ .flatMap(this::callConfigBindingServiceForDatafileConfiguration);
+ }
+
+ private Mono<String> callConsulForConfigBindingServiceEndpoint(EnvProperties envProperties) {
+ logger.info("Retrieving Config Binding Service endpoint from Consul");
+ return httpGetClient.callHttpGet(getConsulUrl(envProperties), JsonArray.class)
+ .flatMap(jsonArray -> this.createConfigBindingServiceUrl(jsonArray, envProperties.appName()));
+
+ }
+
+ private String getConsulUrl(EnvProperties envProperties) {
+ return getUri(envProperties.consulHost(), envProperties.consulPort(), "/v1/catalog/service",
+ envProperties.cbsName());
+ }
+
+ private Mono<JsonObject> callConfigBindingServiceForDatafileConfiguration(String configBindingServiceUri) {
+ logger.info("Retrieving Datafile configuration");
+ return httpGetClient.callHttpGet(configBindingServiceUri, JsonObject.class);
+ }
+
+
+ private Mono<String> createConfigBindingServiceUrl(JsonArray jsonArray, String appName) {
+ return getConfigBindingObject(jsonArray)
+ .flatMap(jsonObject -> buildConfigBindingServiceUrl(jsonObject, appName));
+ }
+
+ private Mono<String> buildConfigBindingServiceUrl(JsonObject jsonObject, String appName) {
+ return Mono.just(getUri(jsonObject.get("ServiceAddress").getAsString(),
+ jsonObject.get("ServicePort").getAsInt(), "/service_component", appName));
+ }
+
+ private Mono<JsonObject> getConfigBindingObject(JsonArray jsonArray) {
+ try {
+ if (jsonArray.size() > 0) {
+ return Mono.just(jsonArray.get(0).getAsJsonObject());
+ } else {
+ throw new IllegalStateException("JSON Array was empty");
+ }
+ } catch (IllegalStateException e) {
+ logger.warn("Failed to retrieve JSON Object from array", e);
+ return Mono.error(e);
+ }
+ }
+
+ private String getUri(String host, Integer port, String... paths) {
+ return new DefaultUriBuilderFactory().builder()
+ .scheme("http")
+ .host(host)
+ .port(port)
+ .path(String.join("/", paths))
+ .build().toString();
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
index c71d1435..72e7d497 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
@@ -30,6 +30,8 @@ import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseExcept
import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Mono;
@@ -42,7 +44,7 @@ import reactor.core.publisher.Mono;
*/
public class DmaapConsumerJsonParser {
- private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerJsonParser.class);
+ private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerJsonParser.class);
private static final String EVENT = "event";
private static final String NOTIFICATION_FIELDS = "notificationFields";
@@ -59,8 +61,7 @@ public class DmaapConsumerJsonParser {
private static final String FILE_FORMAT_VERSION = "fileFormatVersion";
/**
- * Extract info from string and create @see
- * {@link org.onap.dcaegen2.collectors.datafile.service.FileData}.
+ * Extract info from string and create @see {@link FileData}.
*
* @param monoMessage - results from DMaaP
* @return reactive Mono with an array of FileData
@@ -71,17 +72,17 @@ public class DmaapConsumerJsonParser {
private Mono<JsonElement> getJsonParserMessage(String message) {
return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException())
- : Mono.fromSupplier(() -> new JsonParser().parse(message));
+ : Mono.fromSupplier(() -> new JsonParser().parse(message));
}
private Mono<List<FileData>> createJsonConsumerModel(JsonElement jsonElement) {
return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject))
- : getFileDataFromJsonArray(jsonElement);
+ : getFileDataFromJsonArray(jsonElement);
}
private Mono<List<FileData>> getFileDataFromJsonArray(JsonElement jsonElement) {
return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
- .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new)));
+ .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new)));
}
public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
@@ -90,8 +91,8 @@ public class DmaapConsumerJsonParser {
private Mono<List<FileData>> create(Mono<JsonObject> jsonObject) {
return jsonObject.flatMap(monoJsonP -> !containsHeader(monoJsonP)
- ? Mono.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
- : transform(monoJsonP));
+ ? Mono.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
+ : transform(monoJsonP));
}
private Mono<List<FileData>> transform(JsonObject jsonObject) {
@@ -103,28 +104,28 @@ public class DmaapConsumerJsonParser {
JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)
- && arrayOfNamedHashMap != null) {
+ && arrayOfNamedHashMap != null) {
Mono<List<FileData>> res = getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap);
return res;
}
if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) {
return Mono.error(
- new DmaapNotFoundException("FileReady event header is missing information. " + jsonObject));
+ new DmaapNotFoundException("FileReady event header is missing information. " + jsonObject));
} else if (arrayOfNamedHashMap != null) {
return Mono.error(
- new DmaapNotFoundException("FileReady event arrayOfNamedHashMap is missing. " + jsonObject));
+ new DmaapNotFoundException("FileReady event arrayOfNamedHashMap is missing. " + jsonObject));
}
return Mono.error(
- new DmaapNotFoundException("FileReady event does not contain correct information. " + jsonObject));
+ new DmaapNotFoundException("FileReady event does not contain correct information. " + jsonObject));
}
return Mono.error(
- new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject));
+ new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject));
}
private Mono<List<FileData>> getAllFileDataFromJson(String changeIdentifier, String changeType,
- JsonArray arrayOfAdditionalFields) {
+ JsonArray arrayOfAdditionalFields) {
List<FileData> res = new ArrayList<>();
for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
if (arrayOfAdditionalFields.get(i) != null) {
@@ -134,7 +135,7 @@ public class DmaapConsumerJsonParser {
if (fileData != null) {
res.add(fileData);
} else {
- LOGGER.error("Unable to collect file from xNF. File information wrong. " + fileInfo);
+ logger.error("Unable to collect file from xNF. File information wrong. " + fileInfo);
}
}
}
@@ -152,10 +153,10 @@ public class DmaapConsumerJsonParser {
String compression = getValueFromJson(data, COMPRESSION);
if (isFileFormatFieldsNotEmpty(fileFormatVersion, fileFormatType)
- && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) {
+ && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) {
fileData = ImmutableFileData.builder().changeIdentifier(changeIdentifier).changeType(changeType)
- .location(location).compression(compression).fileFormatType(fileFormatType)
- .fileFormatVersion(fileFormatVersion).build();
+ .location(location).compression(compression).fileFormatType(fileFormatType)
+ .fileFormatVersion(fileFormatVersion).build();
}
return fileData;
}
@@ -165,20 +166,20 @@ public class DmaapConsumerJsonParser {
}
private boolean isNotificationFieldsHeaderNotEmpty(String changeIdentifier, String changeType,
- String notificationFieldsVersion) {
+ String notificationFieldsVersion) {
return ((changeIdentifier != null && !changeIdentifier.isEmpty())
- && (changeType != null && !changeType.isEmpty())
- && (notificationFieldsVersion != null && !notificationFieldsVersion.isEmpty()));
+ && (changeType != null && !changeType.isEmpty())
+ && (notificationFieldsVersion != null && !notificationFieldsVersion.isEmpty()));
}
private boolean isFileFormatFieldsNotEmpty(String fileFormatVersion, String fileFormatType) {
return ((fileFormatVersion != null && !fileFormatVersion.isEmpty())
- && (fileFormatType != null && !fileFormatType.isEmpty()));
+ && (fileFormatType != null && !fileFormatType.isEmpty()));
}
private boolean isNameAndLocationAndCompressionNotEmpty(String name, String location, String compression) {
return (name != null && !name.isEmpty()) && (location != null && !location.isEmpty())
- && (compression != null && !compression.isEmpty());
+ && (compression != null && !compression.isEmpty());
}
private boolean containsHeader(JsonObject jsonObject) {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpGetClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpGetClient.java
new file mode 100644
index 00000000..796dbbd8
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpGetClient.java
@@ -0,0 +1,91 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.reactive.function.client.ClientResponse;
+import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Mono;
+
+
+public class HttpGetClient {
+
+ private static final Logger logger = LoggerFactory.getLogger(HttpGetClient.class);
+
+ private final WebClient webClient;
+ private final Gson gson;
+
+ HttpGetClient() {
+ this(WebClient.builder().filter(logRequest()).filter(logResponse()).build());
+ }
+
+ HttpGetClient(WebClient webClient) {
+ this.webClient = webClient;
+ this.gson = new Gson();
+ }
+
+ <T> Mono<T> callHttpGet(String url, Class<T> genericClassDeclaration) {
+ return webClient
+ .get()
+ .uri(url)
+ .retrieve()
+ .onStatus(HttpStatus::is4xxClientError, response -> Mono.error(getException(response)))
+ .onStatus(HttpStatus::is5xxServerError, response -> Mono.error(getException(response)))
+ .bodyToMono(String.class)
+ .flatMap(body -> getJsonFromRequest(body, genericClassDeclaration));
+ }
+
+ private RuntimeException getException(ClientResponse response) {
+ return new RuntimeException(String.format("Request for cloud config failed: HTTP %d",
+ response.statusCode().value()));
+ }
+
+ private <T> Mono<T> getJsonFromRequest(String body, Class<T> genericClassDeclaration) {
+ try {
+ return Mono.just(parseJson(body, genericClassDeclaration));
+ } catch (JsonSyntaxException | IllegalStateException e) {
+ return Mono.error(e);
+ }
+ }
+
+ private <T> T parseJson(String body, Class<T> genericClassDeclaration) {
+ return gson.fromJson(body, genericClassDeclaration);
+ }
+
+ private static ExchangeFilterFunction logResponse() {
+ return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
+ logger.info("Response status {}", clientResponse.statusCode());
+ return Mono.just(clientResponse);
+ });
+ }
+
+ private static ExchangeFilterFunction logRequest() {
+ return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
+ logger.info("Request: {} {}", clientRequest.method(), clientRequest.url());
+ clientRequest.headers()
+ .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value)));
+ return Mono.just(clientRequest);
+ });
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java
index 30bf536e..0c76fc17 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java
@@ -25,7 +25,7 @@ import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient;
-import org.onap.dcaegen2.collectors.datafile.service.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient;
import org.springframework.web.reactive.function.client.WebClient;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java
index fdd1bb49..839e03c9 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java
@@ -24,7 +24,7 @@ import org.onap.dcaegen2.collectors.datafile.configuration.Config;
import org.onap.dcaegen2.collectors.datafile.ftp.FileCollector;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser;
-import org.onap.dcaegen2.collectors.datafile.service.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,8 +55,8 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
}
protected DmaapConsumerTaskImpl(AppConfig datafileAppConfig,
- DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
- DmaapConsumerJsonParser dmaapConsumerJsonParser, FileCollector fileCollector) {
+ DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
+ DmaapConsumerJsonParser dmaapConsumerJsonParser, FileCollector fileCollector) {
this.datafileAppConfig = datafileAppConfig;
this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient;
this.dmaapConsumerJsonParser = dmaapConsumerJsonParser;
@@ -79,7 +79,7 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
dmaaPConsumerReactiveHttpClient = resolveClient();
logger.trace("Method called with arg {}", object);
Mono<List<FileData>> consumerResult =
- consume((dmaaPConsumerReactiveHttpClient.getDmaapConsumerResponse()));
+ consume((dmaaPConsumerReactiveHttpClient.getDmaapConsumerResponse()));
return consumerResult.flatMap(this::getFilesFromSender);
}
@@ -95,8 +95,6 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
@Override
protected DmaapConsumerReactiveHttpClient resolveClient() {
- return dmaaPConsumerReactiveHttpClient == null
- ? new DmaapConsumerReactiveHttpClient(resolveConfiguration()).createDmaapWebClient(buildWebClient())
- : dmaaPConsumerReactiveHttpClient;
+ return new DmaapConsumerReactiveHttpClient(resolveConfiguration()).createDmaapWebClient(buildWebClient());
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java
index 8c4d7072..5779051c 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java
@@ -49,15 +49,14 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
}
@Override
- public Mono<String> publish(Mono<List<ConsumerDmaapModel>> consumerDmaapModels)
- throws DatafileTaskException {
+ public Mono<String> publish(Mono<List<ConsumerDmaapModel>> consumerDmaapModels) {
logger.info("Publishing on DMaaP DataRouter {}", consumerDmaapModels);
return dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModels);
}
@Override
public Mono<String> execute(Mono<List<ConsumerDmaapModel>> consumerDmaapModels)
- throws DatafileTaskException {
+ throws DatafileTaskException {
if (consumerDmaapModels == null) {
throw new DmaapNotFoundException("Invoked null object to DMaaP task");
}
@@ -73,8 +72,7 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
@Override
DmaapProducerReactiveHttpClient resolveClient() {
- return dmaapProducerReactiveHttpClient == null
- ? new DmaapProducerReactiveHttpClient(resolveConfiguration()).createDmaapWebClient(buildWebClient())
- : dmaapProducerReactiveHttpClient;
+ return new DmaapProducerReactiveHttpClient(resolveConfiguration()).createDmaapWebClient(buildWebClient());
}
+
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
new file mode 100644
index 00000000..a4f098be
--- /dev/null
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
@@ -0,0 +1,104 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.configuration;
+
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapConsumerConfiguration;
+import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapPublisherConfiguration;
+
+class CloudConfigParserTest {
+
+ private static final String correctJson =
+ "{\"dmaap.dmaapProducerConfiguration.dmaapTopicName\": \"/events/unauthenticated.VES_NOTIFICATION_OUTPUT\", "
+ + "\"dmaap.dmaapConsumerConfiguration.timeoutMS\": -1,"
+ + " \"dmaap.dmaapConsumerConfiguration.dmaapHostName\": \"message-router.onap.svc.cluster.local\","
+ + "\"dmaap.dmaapConsumerConfiguration.dmaapUserName\": \"admin\", "
+ + "\"dmaap.dmaapProducerConfiguration.dmaapPortNumber\": 3904, "
+ + "\"dmaap.dmaapConsumerConfiguration.dmaapUserPassword\": \"admin\", "
+ + "\"dmaap.dmaapProducerConfiguration.dmaapProtocol\": \"http\", "
+ + "\"dmaap.dmaapProducerConfiguration.dmaapContentType\": \"application/json\", "
+ + "\"dmaap.dmaapConsumerConfiguration.dmaapTopicName\": \"/events/unauthenticated.VES_NOTIFICATION_OUTPUT\", "
+ + "\"dmaap.dmaapConsumerConfiguration.dmaapPortNumber\": 3904, "
+ + "\"dmaap.dmaapConsumerConfiguration.dmaapContentType\": \"application/json\", "
+ + "\"dmaap.dmaapConsumerConfiguration.messageLimit\": -1, "
+ + "\"dmaap.dmaapConsumerConfiguration.dmaapProtocol\": \"http\", "
+ + "\"dmaap.dmaapConsumerConfiguration.consumerId\": \"c12\","
+ + "\"dmaap.dmaapProducerConfiguration.dmaapHostName\": \"message-router.onap.svc.cluster.local\", "
+ + "\"dmaap.dmaapConsumerConfiguration.consumerGroup\": \"OpenDCAE-c12\", "
+ + "\"dmaap.dmaapProducerConfiguration.dmaapUserName\": \"admin\", "
+ + "\"dmaap.dmaapProducerConfiguration.dmaapUserPassword\": \"admin\"}";
+
+ private static final ImmutableDmaapConsumerConfiguration correctDmaapConsumerConfig =
+ new ImmutableDmaapConsumerConfiguration.Builder()
+ .timeoutMS(-1)
+ .dmaapHostName("message-router.onap.svc.cluster.local")
+ .dmaapUserName("admin")
+ .dmaapUserPassword("admin")
+ .dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT")
+ .dmaapPortNumber(3904)
+ .dmaapContentType("application/json")
+ .messageLimit(-1)
+ .dmaapProtocol("http")
+ .consumerId("c12")
+ .consumerGroup("OpenDCAE-c12")
+ .build();
+
+ private static final ImmutableDmaapPublisherConfiguration correctDmaapPublisherConfig =
+ new ImmutableDmaapPublisherConfiguration.Builder()
+ .dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT")
+ .dmaapUserPassword("admin")
+ .dmaapPortNumber(3904)
+ .dmaapProtocol("http")
+ .dmaapContentType("application/json")
+ .dmaapHostName("message-router.onap.svc.cluster.local")
+ .dmaapUserName("admin")
+ .build();
+
+ private CloudConfigParser cloudConfigParser = new CloudConfigParser(
+ new Gson().fromJson(correctJson, JsonObject.class));
+
+
+ @Test
+ public void shouldCreateDmaapConsumerConfigurationCorrectly() {
+ // when
+ DmaapConsumerConfiguration dmaapConsumerConfig = cloudConfigParser.getDmaapConsumerConfig();
+
+ // then
+ assertThat(dmaapConsumerConfig).isNotNull();
+ assertThat(dmaapConsumerConfig).isEqualToComparingFieldByField(correctDmaapConsumerConfig);
+ }
+
+
+ @Test
+ public void shouldCreateDmaapPublisherConfigurationCorrectly() {
+ // when
+ DmaapPublisherConfiguration dmaapPublisherConfig = cloudConfigParser.getDmaapPublisherConfig();
+
+ // then
+ assertThat(dmaapPublisherConfig).isNotNull();
+ assertThat(dmaapPublisherConfig).isEqualToComparingFieldByField(correctDmaapPublisherConfig);
+ }
+} \ No newline at end of file
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectorTest.java
index 5b9d0aaf..2f61ac97 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectorTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectorTest.java
@@ -28,14 +28,13 @@ import java.util.List;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.service.FileData;
-import org.onap.dcaegen2.collectors.datafile.service.ImmutableFileData;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- *
*/
public class FileCollectorTest {
@@ -64,15 +63,15 @@ public class FileCollectorTest {
public void whenSingleFtpesFile_returnCorrectResponse() {
List<FileData> listOfFileData = new ArrayList<FileData>();
listOfFileData.add(ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
- .changeType(FILE_READY_CHANGE_TYPE).location(FTPES_LOCATION).compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build());
+ .changeType(FILE_READY_CHANGE_TYPE).location(FTPES_LOCATION).compression(GZIP_COMPRESSION)
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build());
FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS).port(PORT_22)
- .userId("").password("").build();
+ .userId("").password("").build();
when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)).thenReturn(true);
Mono<List<ConsumerDmaapModel>> consumerModelsMono =
- fileCollectorUndetTest.getFilesFromSender(listOfFileData);
+ fileCollectorUndetTest.getFilesFromSender(listOfFileData);
List<ConsumerDmaapModel> consumerModels = consumerModelsMono.block();
assertEquals(1, consumerModels.size());
@@ -82,7 +81,7 @@ public class FileCollectorTest {
assertEquals(FILE_FORMAT_VERSION, consumerDmaapModel.getFileFormatVersion());
assertEquals(LOCAL_FILE_LOCATION, consumerDmaapModel.getLocation());
FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS)
- .userId("").password("").port(PORT_22).build();
+ .userId("").password("").port(PORT_22).build();
verify(ftpsClientMock, times(1)).collectFile(expectedFileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
verifyNoMoreInteractions(ftpsClientMock);
}
@@ -91,15 +90,15 @@ public class FileCollectorTest {
public void whenSingleSftpFile_returnCorrectResponse() {
List<FileData> listOfFileData = new ArrayList<FileData>();
listOfFileData.add(ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
- .changeType(FILE_READY_CHANGE_TYPE).location(SFTP_LOCATION).compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build());
+ .changeType(FILE_READY_CHANGE_TYPE).location(SFTP_LOCATION).compression(GZIP_COMPRESSION)
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build());
FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS).port(PORT_22)
- .userId("").password("").build();
+ .userId("").password("").build();
when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)).thenReturn(true);
Mono<List<ConsumerDmaapModel>> consumerModelsMono =
- fileCollectorUndetTest.getFilesFromSender(listOfFileData);
+ fileCollectorUndetTest.getFilesFromSender(listOfFileData);
List<ConsumerDmaapModel> consumerModels = consumerModelsMono.block();
assertEquals(1, consumerModels.size());
@@ -109,7 +108,7 @@ public class FileCollectorTest {
assertEquals(FILE_FORMAT_VERSION, consumerDmaapModel.getFileFormatVersion());
assertEquals(LOCAL_FILE_LOCATION, consumerDmaapModel.getLocation());
FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS)
- .userId("").password("").port(PORT_22).build();
+ .userId("").password("").port(PORT_22).build();
verify(sftpClientMock, times(1)).collectFile(expectedFileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
verifyNoMoreInteractions(ftpsClientMock);
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DatafileConfigurationProviderTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DatafileConfigurationProviderTest.java
new file mode 100644
index 00000000..efd89837
--- /dev/null
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DatafileConfigurationProviderTest.java
@@ -0,0 +1,90 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service;
+
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.model.EnvProperties;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableEnvProperties;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+class DatafileConfigurationProviderTest {
+ private static final Gson gson = new Gson();
+ private static final String configBindingService = "[{\"ID\":\"9c8dd674-34ce-7049-d318-e98d93a64303\",\"Node\""
+ + ":\"dcae-bootstrap\",\"Address\":\"10.42.52.82\",\"Datacenter\":\"dc1\",\"TaggedAddresses\":"
+ + "{\"lan\":\"10.42.52.82\",\"wan\":\"10.42.52.82\"},\"NodeMeta\":{\"consul-network-segment\":\"\"},"
+ + "\"ServiceID\":\"dcae-cbs1\",\"ServiceName\":\"config-binding-service\",\"ServiceTags\":[],"
+ + "\"ServiceAddress\":\"config-binding-service\",\"ServicePort\":10000,\"ServiceEnableTagOverride\":false,"
+ + "\"CreateIndex\":14352,\"ModifyIndex\":14352},{\"ID\":\"35c6f540-a29c-1a92-23b0-1305bd8c81f5\",\"Node\":"
+ + "\"dev-consul-server-1\",\"Address\":\"10.42.165.51\",\"Datacenter\":\"dc1\",\"TaggedAddresses\":"
+ + "{\"lan\":\"10.42.165.51\",\"wan\":\"10.42.165.51\"},\"NodeMeta\":{\"consul-network-segment\":\"\"},"
+ + "\"ServiceID\":\"dcae-cbs1\",\"ServiceName\":\"config-binding-service\",\"ServiceTags\":[],"
+ + "\"ServiceAddress\":\"config-binding-service\",\"ServicePort\":10000,\"ServiceEnableTagOverride\":false,"
+ + "\"CreateIndex\":803,\"ModifyIndex\":803}]";
+ private static final JsonArray configBindingServiceJson = gson.fromJson(configBindingService, JsonArray.class);
+ private static final JsonArray emptyConfigBindingServiceJson = gson.fromJson("[]", JsonArray.class);
+ private static final String datafileCollectorMockConfiguration = "{\"test\":1}";
+ private static final JsonObject datafileCollectorMockConfigurationJson = gson.fromJson(datafileCollectorMockConfiguration, JsonObject.class);
+
+ private EnvProperties envProperties = ImmutableEnvProperties.builder()
+ .appName("dcae-datafile-collector")
+ .cbsName("config-binding-service")
+ .consulHost("consul")
+ .consulPort(8500)
+ .build();
+
+ @Test
+ void shouldReturnDatafileCollectorConfiguration() {
+ // given
+ HttpGetClient webClient = mock(HttpGetClient.class);
+ when(
+ webClient.callHttpGet("http://consul:8500/v1/catalog/service/config-binding-service", JsonArray.class))
+ .thenReturn(Mono.just(configBindingServiceJson));
+ when(webClient.callHttpGet("http://config-binding-service:10000/service_component/dcae-datafile-collector", JsonObject.class))
+ .thenReturn(Mono.just(datafileCollectorMockConfigurationJson));
+
+ DatafileConfigurationProvider provider = new DatafileConfigurationProvider(webClient);
+
+ //when/then
+ StepVerifier.create(provider.callForDataFileCollectorConfiguration(envProperties)).expectSubscription()
+ .expectNext(datafileCollectorMockConfigurationJson).verifyComplete();
+ }
+
+ @Test
+ void shouldReturnMonoErrorWhenConsuleDoesntHaveConfigBindingServiceEntry() {
+ // given
+ HttpGetClient webClient = mock(HttpGetClient.class);
+ when(
+ webClient.callHttpGet("http://consul:8500/v1/catalog/service/config-binding-service", JsonArray.class))
+ .thenReturn(Mono.just(emptyConfigBindingServiceJson));
+
+ DatafileConfigurationProvider provider = new DatafileConfigurationProvider(webClient);
+
+ //when/then
+ StepVerifier.create(provider.callForDataFileCollectorConfiguration(envProperties)).expectSubscription()
+ .expectError(IllegalStateException.class).verify();
+ }
+} \ No newline at end of file
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
index 4aad5f45..8c36a51f 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
@@ -26,6 +26,8 @@ import java.util.Optional;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
@@ -44,6 +46,7 @@ class DmaapConsumerJsonParserTest {
@Test
void whenPassingCorrectJson_validationNotThrowingAnException() throws DmaapNotFoundException {
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name("A20161224.1030-1045.bin.gz")
+
.location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz").compression("gzip")
.fileFormatType("org.3GPP.32.435#measCollec").fileFormatVersion("V10").build();
JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier("PM_MEAS_FILES")
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpGetClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpGetClientTest.java
new file mode 100644
index 00000000..d95e3417
--- /dev/null
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpGetClientTest.java
@@ -0,0 +1,75 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonSyntaxException;
+import org.junit.jupiter.api.Test;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+class HttpGetClientTest {
+
+ private static final String SOMEURL = "http://someurl";
+ private static final String DATA = "{}";
+ private Gson gson = new Gson();
+ private WebClient webClient = mock(WebClient.class);
+ private WebClient.RequestHeadersUriSpec requestBodyUriSpec = mock(WebClient.RequestBodyUriSpec.class);
+ private WebClient.ResponseSpec responseSpec = mock(WebClient.ResponseSpec.class);
+
+ @Test
+ void shouldReturnJsonObjectOnGetCall() {
+ //given
+ mockWebClientDependantObject();
+ HttpGetClient httpGetClient = new HttpGetClient(webClient);
+ when(responseSpec.bodyToMono(String.class)).thenReturn(Mono.just(DATA));
+
+ //when/then
+ StepVerifier.create(httpGetClient.callHttpGet(SOMEURL, JsonObject.class)).expectSubscription()
+ .expectNext(gson.fromJson(DATA, JsonObject.class)).verifyComplete();
+ }
+
+ @Test
+ void shouldReturnMonoErrorOnGetCall() {
+ //given
+ mockWebClientDependantObject();
+ HttpGetClient httpGetClient = new HttpGetClient(webClient);
+ when(responseSpec.bodyToMono(String.class)).thenReturn(Mono.just("some wrong data"));
+
+ //when/then
+ StepVerifier.create(httpGetClient.callHttpGet(SOMEURL, JsonObject.class)).expectSubscription()
+ .expectError(JsonSyntaxException.class).verify();
+ }
+
+
+ private void mockWebClientDependantObject() {
+ doReturn(requestBodyUriSpec).when(webClient).get();
+ when(requestBodyUriSpec.uri(SOMEURL)).thenReturn(requestBodyUriSpec);
+ doReturn(responseSpec).when(requestBodyUriSpec).retrieve();
+ doReturn(responseSpec).when(responseSpec).onStatus(any(), any());
+ doReturn(responseSpec).when(responseSpec).onStatus(any(), any());
+ }
+} \ No newline at end of file
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java
index c21c5988..e6818453 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java
@@ -38,9 +38,9 @@ import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseExcept
import org.onap.dcaegen2.collectors.datafile.ftp.FileCollector;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser;
-import org.onap.dcaegen2.collectors.datafile.service.FileData;
-import org.onap.dcaegen2.collectors.datafile.service.ImmutableFileData;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;