From 8a84a9d39fe71984bd5d861e8c865e748ca3df30 Mon Sep 17 00:00:00 2001
From: pwielebs <piotr.wielebski@nokia.com>
Date: Mon, 7 Jan 2019 13:48:18 +0100
Subject: Integration with SDK

 * New fields added to DmaapPublisherConfiguraion
 * New fields added to DmaapConsumerConfiguration
 * Dmaap Consumer web client replaced by SDK's consumer web client
 * UTs aligned
 * Disable enforcer plugin in DFC
 * Update oparent to 1.2.1
Change-Id: I3569180c15227bc2c8df74fd070571b0aa56fa04
Issue-ID: DCAEGEN2-1096
Signed-off-by: pwielebs <piotr.wielebski@nokia.com>
---
 datafile-app-server/config/datafile_endpoints.json |   7 +
 datafile-app-server/pom.xml                        |   4 +
 .../datafile/configuration/AppConfig.java          | 191 +++++++++++++--------
 .../datafile/configuration/CloudConfigParser.java  |  58 ++++---
 .../datafile/configuration/CloudConfiguration.java |   7 +-
 .../collectors/datafile/configuration/Config.java  |   5 +-
 .../datafile/configuration/DatafileAppConfig.java  |  18 +-
 .../datafile/tasks/DmaapConsumerTask.java          |   8 +-
 .../datafile/tasks/DmaapConsumerTaskImpl.java      |  17 +-
 .../datafile/tasks/DmaapPublisherTask.java         |   3 +-
 .../datafile/tasks/DmaapPublisherTaskImpl.java     |   3 +-
 .../configuration/CloudConfigParserTest.java       |  28 ++-
 .../configuration/DatafileAppConfigTest.java       |  10 +-
 .../datafile/tasks/DmaapConsumerTaskImplTest.java  |  27 +--
 .../datafile/tasks/DmaapPublisherTaskImplTest.java |  10 +-
 .../src/test/resources/datafile_endpoints.json     |   8 +-
 datafile-commons/pom.xml                           |   4 +
 .../collectors/datafile/model/CommonFunctions.java |   7 +-
 .../datafile/model/ConsumerDmaapModel.java         |   3 +-
 .../datafile/model/CommonFunctionsTest.java        |   2 +-
 datafile-dmaap-client/pom.xml                      |   4 +
 .../config/DmaapConsumerConfiguration.java         |  47 -----
 .../datafile/config/DmaapCustomConfig.java         |  70 --------
 .../config/DmaapPublisherConfiguration.java        |  33 ----
 .../datafile/service/DmaapReactiveWebClient.java   |   2 +-
 .../consumer/DmaapConsumerReactiveHttpClient.java  |  89 ----------
 .../producer/DmaapProducerReactiveHttpClient.java  |   5 +-
 .../service/DmaapReactiveWebClientTest.java        |   4 +-
 .../config/DmaapConsumerConfigurationTest.java     |  65 -------
 .../config/DmaapPublisherConfigurationTest.java    |  57 ------
 .../DmaapConsumerReactiveHttpClientTest.java       | 115 -------------
 .../DmaapProducerReactiveHttpClientTest.java       |   5 +-
 pom.xml                                            |  21 ++-
 33 files changed, 307 insertions(+), 630 deletions(-)
 delete mode 100644 datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapConsumerConfiguration.java
 delete mode 100644 datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapCustomConfig.java
 delete mode 100644 datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapPublisherConfiguration.java
 delete mode 100644 datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java
 delete mode 100644 datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/config/DmaapConsumerConfigurationTest.java
 delete mode 100644 datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/config/DmaapPublisherConfigurationTest.java
 delete mode 100644 datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClientTest.java

diff --git a/datafile-app-server/config/datafile_endpoints.json b/datafile-app-server/config/datafile_endpoints.json
index 9da01d7b..513ee247 100644
--- a/datafile-app-server/config/datafile_endpoints.json
+++ b/datafile-app-server/config/datafile_endpoints.json
@@ -31,6 +31,13 @@
                 "trustedCA": "config/cacerts",
                 "trustedCAPassword": "secret"
             }
+        },
+        "security": {
+            "trustStorePath" : "change it",
+            "trustStorePasswordPath" : "change it",
+            "keyStorePath" : "change it",
+            "keyStorePasswordPath" : "change it",
+            "enableDmaapCertAuth" : "false"
         }
     }
 }
diff --git a/datafile-app-server/pom.xml b/datafile-app-server/pom.xml
index 5d5317ac..3a53c135 100644
--- a/datafile-app-server/pom.xml
+++ b/datafile-app-server/pom.xml
@@ -130,6 +130,10 @@
   </build>
 
   <dependencies>
+    <dependency>
+      <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+      <artifactId>dmaap-client</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
       <artifactId>cbs-client</artifactId>
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index 0df6b1d1..1e1b40d3 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -18,16 +18,17 @@
 
 package org.onap.dcaegen2.collectors.datafile.configuration;
 
-import java.util.Optional;
-
-import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapConsumerConfiguration;
-import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.stereotype.Component;
 
+import java.util.Optional;
+import java.util.function.Predicate;
+
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
@@ -37,6 +38,7 @@ import org.springframework.stereotype.Component;
 @Configuration
 public class AppConfig extends DatafileAppConfig {
 
+    private static Predicate<String> isEmpty = String::isEmpty;
     @Value("${dmaap.dmaapConsumerConfiguration.dmaapHostName:}")
     public String consumerDmaapHostName;
 
@@ -103,84 +105,129 @@ public class AppConfig extends DatafileAppConfig {
     @Value("${ftp.ftpesConfiguration.trustedCAPassword:}")
     public String trustedCAPassword;
 
+    @Value("${security.trustStorePath:}")
+    public String trustStorePath;
+
+    @Value("${security.trustStorePasswordPath:}")
+    public String trustStorePasswordPath;
+
+    @Value("${security.keyStorePath:}")
+    public String keyStorePath;
+
+    @Value("${security.keyStorePasswordPath:}")
+    public String keyStorePasswordPath;
+
+    @Value("${security.enableDmaapCertAuth:}")
+    public Boolean enableDmaapCertAuth;
+
     @Override
     public DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
         return new ImmutableDmaapConsumerConfiguration.Builder()
-            .dmaapUserPassword(
-                Optional.ofNullable(consumerDmaapUserPassword).filter(p -> !p.isEmpty())
-                    .orElse(dmaapConsumerConfiguration.dmaapUserPassword()))
-            .dmaapUserName(
-                Optional.ofNullable(consumerDmaapUserName).filter(p -> !p.isEmpty())
-                    .orElse(dmaapConsumerConfiguration.dmaapUserName()))
-            .dmaapHostName(
-                Optional.ofNullable(consumerDmaapHostName).filter(p -> !p.isEmpty())
-                    .orElse(dmaapConsumerConfiguration.dmaapHostName()))
-            .dmaapPortNumber(
-                Optional.ofNullable(consumerDmaapPortNumber).filter(p -> !p.toString().isEmpty())
-                    .orElse(dmaapConsumerConfiguration.dmaapPortNumber()))
-            .dmaapProtocol(
-                Optional.ofNullable(consumerDmaapProtocol).filter(p -> !p.isEmpty())
-                    .orElse(dmaapConsumerConfiguration.dmaapProtocol()))
-            .dmaapContentType(
-                Optional.ofNullable(consumerDmaapContentType).filter(p -> !p.isEmpty())
-                    .orElse(dmaapConsumerConfiguration.dmaapContentType()))
-            .dmaapTopicName(
-                Optional.ofNullable(consumerDmaapTopicName).filter(p -> !p.isEmpty())
-                    .orElse(dmaapConsumerConfiguration.dmaapTopicName()))
-            .messageLimit(
-                Optional.ofNullable(consumerMessageLimit).filter(p -> !p.toString().isEmpty())
-                    .orElse(dmaapConsumerConfiguration.messageLimit()))
-            .timeoutMS(Optional.ofNullable(consumerTimeoutMS).filter(p -> !p.toString().isEmpty())
-                .orElse(dmaapConsumerConfiguration.timeoutMS()))
-            .consumerGroup(Optional.ofNullable(consumerGroup).filter(p -> !p.isEmpty())
-                .orElse(dmaapConsumerConfiguration.consumerGroup()))
-            .consumerId(Optional.ofNullable(consumerId).filter(p -> !p.isEmpty())
-                .orElse(dmaapConsumerConfiguration.consumerId()))
-            .build();
+                .dmaapUserPassword(
+                        Optional.ofNullable(consumerDmaapUserPassword).filter(isEmpty.negate())
+                                .orElse(dmaapConsumerConfiguration.dmaapUserPassword()))
+                .dmaapUserName(
+                        Optional.ofNullable(consumerDmaapUserName).filter(isEmpty.negate())
+                                .orElse(dmaapConsumerConfiguration.dmaapUserName()))
+                .dmaapHostName(
+                        Optional.ofNullable(consumerDmaapHostName).filter(isEmpty.negate())
+                                .orElse(dmaapConsumerConfiguration.dmaapHostName()))
+                .dmaapPortNumber(
+                        Optional.ofNullable(consumerDmaapPortNumber).filter(p -> !p.toString().isEmpty())
+                                .orElse(dmaapConsumerConfiguration.dmaapPortNumber()))
+                .dmaapProtocol(
+                        Optional.ofNullable(consumerDmaapProtocol).filter(isEmpty.negate())
+                                .orElse(dmaapConsumerConfiguration.dmaapProtocol()))
+                .dmaapContentType(
+                        Optional.ofNullable(consumerDmaapContentType).filter(isEmpty.negate())
+                                .orElse(dmaapConsumerConfiguration.dmaapContentType()))
+                .dmaapTopicName(
+                        Optional.ofNullable(consumerDmaapTopicName).filter(isEmpty.negate())
+                                .orElse(dmaapConsumerConfiguration.dmaapTopicName()))
+                .messageLimit(
+                        Optional.ofNullable(consumerMessageLimit).filter(p -> !p.toString().isEmpty())
+                                .orElse(dmaapConsumerConfiguration.messageLimit()))
+                .timeoutMs(Optional.ofNullable(consumerTimeoutMS).filter(p -> !p.toString().isEmpty())
+                        .orElse(dmaapConsumerConfiguration.timeoutMs()))
+                .consumerGroup(Optional.ofNullable(consumerGroup).filter(isEmpty.negate())
+                        .orElse(dmaapConsumerConfiguration.consumerGroup()))
+                .consumerId(Optional.ofNullable(consumerId).filter(isEmpty.negate())
+                        .orElse(dmaapConsumerConfiguration.consumerId()))
+                .trustStorePath(
+                        Optional.ofNullable(trustStorePath).filter(isEmpty.negate())
+                                .orElse(dmaapConsumerConfiguration.trustStorePath()))
+                .trustStorePasswordPath(
+                        Optional.ofNullable(trustStorePasswordPath).filter(isEmpty.negate())
+                                .orElse(dmaapConsumerConfiguration.trustStorePasswordPath()))
+                .keyStorePath(
+                        Optional.ofNullable(keyStorePath).filter(isEmpty.negate())
+                                .orElse(dmaapConsumerConfiguration.keyStorePath()))
+                .keyStorePasswordPath(
+                        Optional.ofNullable(keyStorePasswordPath).filter(isEmpty.negate())
+                                .orElse(dmaapConsumerConfiguration.keyStorePasswordPath()))
+                .enableDmaapCertAuth(
+                        Optional.ofNullable(enableDmaapCertAuth).filter(p -> !p.toString().isEmpty())
+                                .orElse(dmaapConsumerConfiguration.enableDmaapCertAuth()))
+                .build();
     }
 
     @Override
     public DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
         return new ImmutableDmaapPublisherConfiguration.Builder()
-            .dmaapContentType(
-                Optional.ofNullable(producerDmaapContentType).filter(p -> !p.isEmpty())
-                    .orElse(dmaapPublisherConfiguration.dmaapContentType()))
-            .dmaapHostName(
-                Optional.ofNullable(producerDmaapHostName).filter(p -> !p.isEmpty())
-                    .orElse(dmaapPublisherConfiguration.dmaapHostName()))
-            .dmaapPortNumber(
-                Optional.ofNullable(producerDmaapPortNumber).filter(p -> !p.toString().isEmpty())
-                    .orElse(dmaapPublisherConfiguration.dmaapPortNumber()))
-            .dmaapProtocol(
-                Optional.ofNullable(producerDmaapProtocol).filter(p -> !p.isEmpty())
-                    .orElse(dmaapPublisherConfiguration.dmaapProtocol()))
-            .dmaapTopicName(
-                Optional.ofNullable(producerDmaapTopicName).filter(p -> !p.isEmpty())
-                    .orElse(dmaapPublisherConfiguration.dmaapTopicName()))
-            .dmaapUserName(
-                Optional.ofNullable(producerDmaapUserName).filter(p -> !p.isEmpty())
-                    .orElse(dmaapPublisherConfiguration.dmaapUserName()))
-            .dmaapUserPassword(
-                Optional.ofNullable(producerDmaapUserPassword).filter(p -> !p.isEmpty())
-                    .orElse(dmaapPublisherConfiguration.dmaapUserPassword()))
-            .build();
+                .dmaapContentType(
+                        Optional.ofNullable(producerDmaapContentType).filter(isEmpty.negate())
+                                .orElse(dmaapPublisherConfiguration.dmaapContentType()))
+                .dmaapHostName(
+                        Optional.ofNullable(producerDmaapHostName).filter(isEmpty.negate())
+                                .orElse(dmaapPublisherConfiguration.dmaapHostName()))
+                .dmaapPortNumber(
+                        Optional.ofNullable(producerDmaapPortNumber).filter(p -> !p.toString().isEmpty())
+                                .orElse(dmaapPublisherConfiguration.dmaapPortNumber()))
+                .dmaapProtocol(
+                        Optional.ofNullable(producerDmaapProtocol).filter(isEmpty.negate())
+                                .orElse(dmaapPublisherConfiguration.dmaapProtocol()))
+                .dmaapTopicName(
+                        Optional.ofNullable(producerDmaapTopicName).filter(isEmpty.negate())
+                                .orElse(dmaapPublisherConfiguration.dmaapTopicName()))
+                .dmaapUserName(
+                        Optional.ofNullable(producerDmaapUserName).filter(isEmpty.negate())
+                                .orElse(dmaapPublisherConfiguration.dmaapUserName()))
+                .dmaapUserPassword(
+                        Optional.ofNullable(producerDmaapUserPassword).filter(isEmpty.negate())
+                                .orElse(dmaapPublisherConfiguration.dmaapUserPassword()))
+                .trustStorePath(
+                        Optional.ofNullable(trustStorePath).filter(isEmpty.negate())
+                                .orElse(dmaapPublisherConfiguration.trustStorePath()))
+                .trustStorePasswordPath(
+                        Optional.ofNullable(trustStorePasswordPath).filter(isEmpty.negate())
+                                .orElse(dmaapPublisherConfiguration.trustStorePasswordPath()))
+                .keyStorePath(
+                        Optional.ofNullable(keyStorePath).filter(isEmpty.negate())
+                                .orElse(dmaapPublisherConfiguration.keyStorePath()))
+                .keyStorePasswordPath(
+                        Optional.ofNullable(keyStorePasswordPath).filter(isEmpty.negate())
+                                .orElse(dmaapPublisherConfiguration.keyStorePasswordPath()))
+                .enableDmaapCertAuth(
+                        Optional.ofNullable(enableDmaapCertAuth).filter(p -> !p.toString().isEmpty())
+                                .orElse(dmaapPublisherConfiguration.enableDmaapCertAuth()))
+                .build();
     }
 
     @Override
     public FtpesConfig getFtpesConfiguration() {
         return new ImmutableFtpesConfig.Builder()
-            .keyCert(
-                Optional.ofNullable(keyCert).filter(p -> !p.isEmpty())
-                    .orElse(ftpesConfig.keyCert()))
-            .keyPassword(
-                    Optional.ofNullable(keyPassword).filter(p -> !p.isEmpty())
-                        .orElse(ftpesConfig.keyPassword()))
-            .trustedCA(
-                Optional.ofNullable(trustedCA).filter(p -> !p.isEmpty())
-                    .orElse(ftpesConfig.trustedCA()))
-            .trustedCAPassword(
-                Optional.ofNullable(trustedCAPassword).filter(p -> !p.isEmpty())
-                    .orElse(ftpesConfig.trustedCAPassword()))
-            .build();
+                .keyCert(
+                        Optional.ofNullable(keyCert).filter(isEmpty.negate())
+                                .orElse(ftpesConfig.keyCert()))
+                .keyPassword(
+                        Optional.ofNullable(keyPassword).filter(isEmpty.negate())
+                                .orElse(ftpesConfig.keyPassword()))
+                .trustedCA(
+                        Optional.ofNullable(trustedCA).filter(isEmpty.negate())
+                                .orElse(ftpesConfig.trustedCA()))
+                .trustedCAPassword(
+                        Optional.ofNullable(trustedCAPassword).filter(isEmpty.negate())
+                                .orElse(ftpesConfig.trustedCAPassword()))
+                .build();
     }
 }
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
index 7303a68f..44eba772 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
@@ -19,11 +19,11 @@
 package org.onap.dcaegen2.collectors.datafile.configuration;
 
 import com.google.gson.JsonObject;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
 
-import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapConsumerConfiguration;
-import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapPublisherConfiguration;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
@@ -39,30 +39,40 @@ public class CloudConfigParser {
 
     DmaapPublisherConfiguration getDmaapPublisherConfig() {
         return new ImmutableDmaapPublisherConfiguration.Builder()
-            .dmaapTopicName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapTopicName").getAsString())
-            .dmaapUserPassword(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserPassword").getAsString())
-            .dmaapPortNumber(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapPortNumber").getAsInt())
-            .dmaapProtocol(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapProtocol").getAsString())
-            .dmaapContentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString())
-            .dmaapHostName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapHostName").getAsString())
-            .dmaapUserName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserName").getAsString())
-            .build();
+                .dmaapTopicName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapTopicName").getAsString())
+                .dmaapUserPassword(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserPassword").getAsString())
+                .dmaapPortNumber(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapPortNumber").getAsInt())
+                .dmaapProtocol(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapProtocol").getAsString())
+                .dmaapContentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString())
+                .dmaapHostName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapHostName").getAsString())
+                .dmaapUserName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserName").getAsString())
+                .trustStorePath(jsonObject.get("dmaap.security.trustStorePath").getAsString())
+                .trustStorePasswordPath(jsonObject.get("dmaap.security.trustStorePasswordPath").getAsString())
+                .keyStorePath(jsonObject.get("dmaap.security.keyStorePath").getAsString())
+                .keyStorePasswordPath(jsonObject.get("dmaap.security.keyStorePasswordPath").getAsString())
+                .enableDmaapCertAuth(jsonObject.get("dmaap.security.enableDmaapCertAuth").getAsBoolean())
+                .build();
     }
 
     DmaapConsumerConfiguration getDmaapConsumerConfig() {
         return new ImmutableDmaapConsumerConfiguration.Builder()
-            .timeoutMS(jsonObject.get("dmaap.dmaapConsumerConfiguration.timeoutMS").getAsInt())
-            .dmaapHostName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapHostName").getAsString())
-            .dmaapUserName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserName").getAsString())
-            .dmaapUserPassword(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserPassword").getAsString())
-            .dmaapTopicName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapTopicName").getAsString())
-            .dmaapPortNumber(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapPortNumber").getAsInt())
-            .dmaapContentType(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapContentType").getAsString())
-            .messageLimit(jsonObject.get("dmaap.dmaapConsumerConfiguration.messageLimit").getAsInt())
-            .dmaapProtocol(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapProtocol").getAsString())
-            .consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString())
-            .consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString())
-            .build();
+                .timeoutMs(jsonObject.get("dmaap.dmaapConsumerConfiguration.timeoutMS").getAsInt())
+                .dmaapHostName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapHostName").getAsString())
+                .dmaapUserName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserName").getAsString())
+                .dmaapUserPassword(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserPassword").getAsString())
+                .dmaapTopicName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapTopicName").getAsString())
+                .dmaapPortNumber(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapPortNumber").getAsInt())
+                .dmaapContentType(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapContentType").getAsString())
+                .messageLimit(jsonObject.get("dmaap.dmaapConsumerConfiguration.messageLimit").getAsInt())
+                .dmaapProtocol(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapProtocol").getAsString())
+                .consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString())
+                .consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString())
+                .trustStorePath(jsonObject.get("dmaap.security.trustStorePath").getAsString())
+                .trustStorePasswordPath(jsonObject.get("dmaap.security.trustStorePasswordPath").getAsString())
+                .keyStorePath(jsonObject.get("dmaap.security.keyStorePath").getAsString())
+                .keyStorePasswordPath(jsonObject.get("dmaap.security.keyStorePasswordPath").getAsString())
+                .enableDmaapCertAuth(jsonObject.get("dmaap.security.enableDmaapCertAuth").getAsBoolean())
+                .build();
     }
 
     public FtpesConfig getFtpesConfig() {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
index 34ccd76c..9838afb1 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
@@ -21,8 +21,9 @@ import com.google.gson.JsonObject;
 import java.util.Optional;
 import java.util.Properties;
 
-import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.ReactiveCloudConfigurationProvider;
 import org.slf4j.Logger;
@@ -47,7 +48,7 @@ import reactor.core.scheduler.Schedulers;
 @EnableConfigurationProperties
 @EnableScheduling
 @Primary
-public class CloudConfiguration extends AppConfig {   
+public class CloudConfiguration extends AppConfig {
     private static final Logger logger = LoggerFactory.getLogger(CloudConfiguration.class);
     private ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider;
     private DmaapPublisherConfiguration dmaapPublisherCloudConfiguration;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/Config.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/Config.java
index 7cd65ea2..7fe2561c 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/Config.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/Config.java
@@ -18,8 +18,9 @@
 
 package org.onap.dcaegen2.collectors.datafile.configuration;
 
-import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
+
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/9/18
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java
index 66480792..3af55453 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java
@@ -26,8 +26,9 @@ import java.util.ServiceLoader;
 import javax.validation.constraints.NotEmpty;
 import javax.validation.constraints.NotNull;
 
-import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
+
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.boot.context.properties.ConfigurationProperties;
@@ -56,7 +57,7 @@ public abstract class DatafileAppConfig implements Config {
     private static final String DMAAP_CONSUMER = "dmaapConsumerConfiguration";
     private static final String FTP = "ftp";
     private static final String FTPES_CONFIGURATION = "ftpesConfiguration";
-
+    private static final String SECURITY = "security";
 
     private static final Logger logger = LoggerFactory.getLogger(DatafileAppConfig.class);
 
@@ -99,12 +100,14 @@ public abstract class DatafileAppConfig implements Config {
                 ftpesConfig = deserializeType(gsonBuilder,
                         jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(FTP).getAsJsonObject(FTPES_CONFIGURATION),
                         FtpesConfig.class);
-                dmaapConsumerConfiguration = deserializeType(gsonBuilder,
+                dmaapConsumerConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects(
                         jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP).getAsJsonObject(DMAAP_CONSUMER),
+                        rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
                         DmaapConsumerConfiguration.class);
 
-                dmaapPublisherConfiguration = deserializeType(gsonBuilder,
+                dmaapPublisherConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects(
                         jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP).getAsJsonObject(DMAAP_PRODUCER),
+                        rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
                         DmaapPublisherConfiguration.class);
             }
         } catch (IOException e) {
@@ -135,4 +138,9 @@ public abstract class DatafileAppConfig implements Config {
         this.filepath = filepath;
     }
 
+    private JsonObject concatenateJsonObjects(JsonObject target, JsonObject source) {
+        source.entrySet()
+                .forEach(entry -> target.add(entry.getKey(), entry.getValue()));
+        return target;
+    }
 }
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java
index 32fdbdc7..4fbc17f7 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java
@@ -18,11 +18,13 @@
 
 package org.onap.dcaegen2.collectors.datafile.tasks;
 
-import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
+
 import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
 import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient;
-import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient;
+
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
 import org.springframework.web.reactive.function.client.WebClient;
 
 import reactor.core.publisher.Flux;
@@ -36,7 +38,7 @@ abstract class DmaapConsumerTask {
 
     abstract Flux<FileData> consume(Mono<String> message) throws DmaapNotFoundException;
 
-    abstract DmaapConsumerReactiveHttpClient resolveClient();
+    abstract DMaaPConsumerReactiveHttpClient resolveClient();
 
     abstract void initConfigs();
 
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java
index f80db897..5bd0bf30 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java
@@ -16,12 +16,13 @@
 
 package org.onap.dcaegen2.collectors.datafile.tasks;
 
-import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
+
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.configuration.Config;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
 import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser;
-import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -41,7 +42,7 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
 
     private Config datafileAppConfig;
     private DmaapConsumerJsonParser dmaapConsumerJsonParser;
-    private DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
+    private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
 
     @Autowired
     public DmaapConsumerTaskImpl(AppConfig datafileAppConfig) {
@@ -50,8 +51,8 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
     }
 
     protected DmaapConsumerTaskImpl(AppConfig datafileAppConfig,
-            DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
-            DmaapConsumerJsonParser dmaapConsumerJsonParser) {
+                                    DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
+                                    DmaapConsumerJsonParser dmaapConsumerJsonParser) {
         this.datafileAppConfig = datafileAppConfig;
         this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient;
         this.dmaapConsumerJsonParser = dmaapConsumerJsonParser;
@@ -67,7 +68,7 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
     protected Flux<FileData> execute(String object) {
         dmaaPConsumerReactiveHttpClient = resolveClient();
         logger.trace("execute called with arg {}", object);
-        return consume((dmaaPConsumerReactiveHttpClient.getDmaapConsumerResponse()));
+        return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()));
     }
 
     @Override
@@ -81,7 +82,7 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
     }
 
     @Override
-    protected DmaapConsumerReactiveHttpClient resolveClient() {
-        return new DmaapConsumerReactiveHttpClient(resolveConfiguration()).createDmaapWebClient(buildWebClient());
+    protected DMaaPConsumerReactiveHttpClient resolveClient() {
+        return new DMaaPConsumerReactiveHttpClient(resolveConfiguration(), buildWebClient());
     }
 }
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java
index 8508cd12..cb194cf5 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java
@@ -16,10 +16,11 @@
 
 package org.onap.dcaegen2.collectors.datafile.tasks;
 
-import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
+
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
 
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
 import reactor.core.publisher.Flux;
 
 /**
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java
index 201b33d1..56a2fc2a 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java
@@ -16,11 +16,12 @@
 
 package org.onap.dcaegen2.collectors.datafile.tasks;
 
-import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
+
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.configuration.Config;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
index 119224b4..2e6b63b8 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
@@ -21,24 +21,32 @@ import static org.assertj.core.api.Assertions.assertThat;
 import com.google.gson.JsonObject;
 
 import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapConsumerConfiguration;
-import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+
 
 class CloudConfigParserTest {
 
     private static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG =
-            new ImmutableDmaapConsumerConfiguration.Builder().timeoutMS(-1)
+            new ImmutableDmaapConsumerConfiguration.Builder().timeoutMs(-1)
                     .dmaapHostName("message-router.onap.svc.cluster.local").dmaapUserName("admin")
                     .dmaapUserPassword("admin").dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT")
                     .dmaapPortNumber(2222).dmaapContentType("application/json").messageLimit(-1).dmaapProtocol("http")
-                    .consumerId("C12").consumerGroup("OpenDCAE-c12").build();
+                    .consumerId("C12").consumerGroup("OpenDCAE-c12").trustStorePath("trustStorePath")
+                    .trustStorePasswordPath("trustStorePasswordPath").keyStorePath("keyStorePath")
+                    .keyStorePasswordPath("keyStorePasswordPath").enableDmaapCertAuth(true)
+                    .build();
 
     private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG =
             new ImmutableDmaapPublisherConfiguration.Builder().dmaapTopicName("publish").dmaapUserPassword("dradmin")
                     .dmaapPortNumber(3907).dmaapProtocol("https").dmaapContentType("application/json")
-                    .dmaapHostName("message-router.onap.svc.cluster.local").dmaapUserName("dradmin").build();
+                    .dmaapHostName("message-router.onap.svc.cluster.local").dmaapUserName("dradmin")
+                    .trustStorePath("trustStorePath")
+                    .trustStorePasswordPath("trustStorePasswordPath").keyStorePath("keyStorePath")
+                    .keyStorePasswordPath("keyStorePasswordPath").enableDmaapCertAuth(true)
+                    .build();
 
     private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION =
             new ImmutableFtpesConfig.Builder().keyCert("/config/ftpKey.jks").keyPassword("secret")
@@ -96,6 +104,12 @@ class CloudConfigParserTest {
         config.addProperty("dmaap.ftpesConfig.trustedCA", "config/cacerts");
         config.addProperty("dmaap.ftpesConfig.trustedCAPassword", "secret");
 
+        config.addProperty("dmaap.security.trustStorePath", "trustStorePath");
+        config.addProperty("dmaap.security.trustStorePasswordPath", "trustStorePasswordPath");
+        config.addProperty("dmaap.security.keyStorePath", "keyStorePath");
+        config.addProperty("dmaap.security.keyStorePasswordPath", "keyStorePasswordPath");
+        config.addProperty("dmaap.security.enableDmaapCertAuth", "true");
+
         return config;
     }
 }
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java
index 1238a472..62302793 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java
@@ -176,7 +176,7 @@ class DatafileAppConfigTest {
         dmaapConsumerConfigData.addProperty("dmaapContentType", "application/json");
         dmaapConsumerConfigData.addProperty("consumerId", "C12");
         dmaapConsumerConfigData.addProperty("consumerGroup", "OpenDcae-c12");
-        dmaapConsumerConfigData.addProperty("timeoutMS", -1);
+        dmaapConsumerConfigData.addProperty("timeoutMs", -1);
         dmaapConsumerConfigData.addProperty("messageLimit", 1);
 
         JsonObject dmaapProducerConfigData = new JsonObject();
@@ -200,12 +200,20 @@ class DatafileAppConfigTest {
         ftpesConfigData.addProperty("trustedCA", "config/cacerts");
         ftpesConfigData.addProperty("trustedCAPassword", "secret");
 
+        JsonObject security = new JsonObject();
+        security.addProperty("trustStorePath", "trustStorePath");
+        security.addProperty("trustStorePasswordPath", "trustStorePasswordPath");
+        security.addProperty("keyStorePath", "keyStorePath");
+        security.addProperty("keyStorePasswordPath", "keyStorePasswordPath");
+        security.addProperty("enableDmaapCertAuth", "enableDmaapCertAuth");
+
         JsonObject ftpesConfiguration = new JsonObject();
         ftpesConfiguration.add("ftpesConfiguration", ftpesConfigData);
 
         JsonObject configs = new JsonObject();
         configs.add("dmaap", dmaapConfigs);
         configs.add("ftp", ftpesConfiguration);
+        configs.add("security", security);
 
         JsonObject completeJson = new JsonObject();
         completeJson.add("configs", configs);
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java
index c6d115f6..f8f6cf64 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java
@@ -29,8 +29,7 @@ import java.util.List;
 
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapConsumerConfiguration;
+
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
@@ -41,10 +40,13 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData;
 import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser;
-import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient;
+
 import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
 import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
 
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
@@ -81,7 +83,7 @@ class DmaapConsumerTaskImplTest {
     private static AppConfig appConfig;
     private static DmaapConsumerConfiguration dmaapConsumerConfiguration;
     private DmaapConsumerTaskImpl dmaapConsumerTask;
-    private DmaapConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient;
+    private DMaaPConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient;
 
     private static String ftpesMessage;
     private static FileData ftpesFileData;
@@ -101,8 +103,13 @@ class DmaapConsumerTaskImplTest {
                 .dmaapUserName("Datafile")
                 .dmaapUserPassword("Datafile")
                 .dmaapTopicName("unauthenticated.NOTIFICATION")
-                .timeoutMS(-1)
+                .timeoutMs(-1)
                 .messageLimit(-1)
+                .trustStorePath("trustStorePath")
+                .trustStorePasswordPath("trustStorePasswordPath")
+                .keyStorePath("keyStorePath")
+                .keyStorePasswordPath("keyStorePasswordPath")
+                .enableDmaapCertAuth(true)
                 .build();
 
         appConfig = mock(AppConfig.class);
@@ -191,7 +198,7 @@ class DmaapConsumerTaskImplTest {
         StepVerifier.create(dmaapConsumerTask.execute("Sample input")).expectSubscription()
                 .expectError(DmaapEmptyResponseException.class).verify();
 
-        verify(dmaapConsumerReactiveHttpClient, times(1)).getDmaapConsumerResponse();
+        verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
     }
 
     @Test
@@ -200,7 +207,7 @@ class DmaapConsumerTaskImplTest {
 
         StepVerifier.create(dmaapConsumerTask.execute(ftpesMessage)).expectNext(ftpesFileData).verifyComplete();
 
-        verify(dmaapConsumerReactiveHttpClient, times(1)).getDmaapConsumerResponse();
+        verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
         verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
     }
 
@@ -210,15 +217,15 @@ class DmaapConsumerTaskImplTest {
 
         StepVerifier.create(dmaapConsumerTask.execute(ftpesMessage)).expectNext(sftpFileData).verifyComplete();
 
-        verify(dmaapConsumerReactiveHttpClient, times(1)).getDmaapConsumerResponse();
+        verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
         verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
     }
 
     private void prepareMocksForDmaapConsumer(String message, FileData fileDataAfterConsume) {
         Mono<String> messageAsMono = Mono.just(message);
         DmaapConsumerJsonParser dmaapConsumerJsonParserMock = mock(DmaapConsumerJsonParser.class);
-        dmaapConsumerReactiveHttpClient = mock(DmaapConsumerReactiveHttpClient.class);
-        when(dmaapConsumerReactiveHttpClient.getDmaapConsumerResponse()).thenReturn(messageAsMono);
+        dmaapConsumerReactiveHttpClient = mock(DMaaPConsumerReactiveHttpClient.class);
+        when(dmaapConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(messageAsMono);
 
         if (!message.isEmpty()) {
             when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono)).thenReturn(Flux.just(fileDataAfterConsume));
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java
index 7770eec4..5b29bf10 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java
@@ -27,12 +27,13 @@ import static org.mockito.Mockito.when;
 
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapPublisherConfiguration;
+
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
 import org.springframework.http.HttpStatus;
 
 import reactor.core.publisher.Flux;
@@ -68,6 +69,11 @@ class DmaapPublisherTaskImplTest {
                 .dmaapUserName("DFC")
                 .dmaapUserPassword("DFC")
                 .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT")
+                .trustStorePath("trustStorePath")
+                .trustStorePasswordPath("trustStorePasswordPath")
+                .keyStorePath("keyStorePath")
+                .keyStorePasswordPath("keyStorePasswordPath")
+                .enableDmaapCertAuth(true)
                 .build();
         consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
                 .productName(PRODUCT_NAME)
diff --git a/datafile-app-server/src/test/resources/datafile_endpoints.json b/datafile-app-server/src/test/resources/datafile_endpoints.json
index 1b2ab1ec..af39ac9c 100644
--- a/datafile-app-server/src/test/resources/datafile_endpoints.json
+++ b/datafile-app-server/src/test/resources/datafile_endpoints.json
@@ -10,7 +10,6 @@
                 "dmaapUserName": "admin",
                 "dmaapUserPassword": "admin",
                 "dmaapContentType": "application/json",
-                "consumerId": "C12",
                 "consumerGroup": "OpenDcae-c12",
                 "timeoutMS": -1,
                 "messageLimit": 1
@@ -32,6 +31,13 @@
                 "trustedCA": "/config/ftpKey.jks",
                 "trustedCAPassword": "secret"
             }
+        },
+        "security": {
+            "trustStorePath" : "trustStorePath",
+            "trustStorePasswordPath" : "trustStorePasswordPath",
+            "keyStorePath" : "keyStorePath",
+            "keyStorePasswordPath" : "keyStorePasswordPath",
+            "enableDmaapCertAuth" : "enableDmaapCertAuth"
         }
     }
 }
diff --git a/datafile-commons/pom.xml b/datafile-commons/pom.xml
index 4ef2c686..400ca28a 100644
--- a/datafile-commons/pom.xml
+++ b/datafile-commons/pom.xml
@@ -32,6 +32,10 @@
   <packaging>jar</packaging>
 
   <dependencies>
+    <dependency>
+      <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+      <artifactId>common-dependency</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.immutables</groupId>
       <artifactId>value</artifactId>
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
index 3be7bcf6..801f1705 100644
--- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
@@ -20,15 +20,14 @@ package org.onap.dcaegen2.collectors.datafile.model;
 
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
+import org.onap.dcaegen2.services.sdk.rest.services.model.JsonBodyBuilder;
 
 
-public class CommonFunctions {
+public class CommonFunctions implements JsonBodyBuilder<ConsumerDmaapModel> {
 
     private static Gson gson = new GsonBuilder().serializeNulls().create();
 
-    private CommonFunctions() {}
-
-    public static String createJsonBody(ConsumerDmaapModel consumerDmaapModel) {
+    public  String createJsonBody(ConsumerDmaapModel consumerDmaapModel) {
         return gson.toJson(consumerDmaapModel);
     }
 }
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java
index 62315602..883a73af 100644
--- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java
@@ -20,6 +20,7 @@ import com.google.gson.annotations.SerializedName;
 
 import org.immutables.gson.Gson;
 import org.immutables.value.Value;
+import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
@@ -28,7 +29,7 @@ import org.immutables.value.Value;
 
 @Value.Immutable
 @Gson.TypeAdapters
-public interface ConsumerDmaapModel {
+public interface ConsumerDmaapModel extends DmaapModel {
 
     @SerializedName("productName")
     String getProductName();
diff --git a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
index 572bca85..cb6c48d9 100644
--- a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
+++ b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
@@ -53,6 +53,6 @@ class CommonFunctionsTest {
     // @formatter:on
     @Test
     void createJsonBody_shouldReturnJsonInString() {
-        assertEquals(EXPECTED_RESULT, CommonFunctions.createJsonBody(model));
+        assertEquals(EXPECTED_RESULT, new CommonFunctions().createJsonBody(model));
     }
 }
diff --git a/datafile-dmaap-client/pom.xml b/datafile-dmaap-client/pom.xml
index 6d813c85..0f3cf6aa 100644
--- a/datafile-dmaap-client/pom.xml
+++ b/datafile-dmaap-client/pom.xml
@@ -38,6 +38,10 @@
 
   <dependencies>
     <!-- DEVELOPMENT DEPENDENCIES -->
+    <dependency>
+      <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+      <artifactId>dmaap-client</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.asynchttpclient</groupId>
       <artifactId>async-http-client</artifactId>
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapConsumerConfiguration.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapConsumerConfiguration.java
deleted file mode 100644
index 568d4ecc..00000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapConsumerConfiguration.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.config;
-
-import org.immutables.gson.Gson;
-import org.immutables.value.Value;
-import org.springframework.stereotype.Component;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
- */
-@Component
-@Value.Immutable(prehash = true)
-@Value.Style(builder = "new")
-@Gson.TypeAdapters
-public abstract class DmaapConsumerConfiguration implements DmaapCustomConfig {
-
-    private static final long serialVersionUID = 1L;
-
-    @Value.Parameter
-    public abstract String consumerId();
-
-    @Value.Parameter
-    public abstract String consumerGroup();
-
-    @Value.Parameter
-    public abstract Integer timeoutMS();
-
-    @Value.Parameter
-    public abstract Integer messageLimit();
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapCustomConfig.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapCustomConfig.java
deleted file mode 100644
index 0b1d99eb..00000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapCustomConfig.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.config;
-
-import java.io.Serializable;
-
-import org.immutables.value.Value;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/28/18
- */
-public interface DmaapCustomConfig extends Serializable {
-
-    @Value.Parameter
-    String dmaapHostName();
-
-    @Value.Parameter
-    Integer dmaapPortNumber();
-
-    @Value.Parameter
-    String dmaapTopicName();
-
-    @Value.Parameter
-    String dmaapProtocol();
-
-    @Value.Parameter
-    String dmaapUserName();
-
-    @Value.Parameter
-    String dmaapUserPassword();
-
-    @Value.Parameter
-    String dmaapContentType();
-
-
-    interface Builder<T extends DmaapCustomConfig, B extends Builder<T, B>> {
-
-        B dmaapHostName(String dmaapHostName);
-
-        B dmaapPortNumber(Integer dmaapPortNumber);
-
-        B dmaapTopicName(String dmaapTopicName);
-
-        B dmaapProtocol(String dmaapProtocol);
-
-        B dmaapUserName(String dmaapUserName);
-
-        B dmaapUserPassword(String dmaapUserPassword);
-
-        B dmaapContentType(String dmaapContentType);
-
-        T build();
-    }
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapPublisherConfiguration.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapPublisherConfiguration.java
deleted file mode 100644
index 06d20f73..00000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/config/DmaapPublisherConfiguration.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.config;
-
-import org.immutables.gson.Gson;
-import org.immutables.value.Value;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
- */
-@Value.Immutable(prehash = true)
-@Value.Style(builder = "new")
-@Gson.TypeAdapters
-public abstract class DmaapPublisherConfiguration implements DmaapCustomConfig {
-
-    private static final long serialVersionUID = 1L;
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
index 7249c083..2e9c8488 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
@@ -18,7 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.service;
 
 import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
 
-import org.onap.dcaegen2.collectors.datafile.config.DmaapCustomConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapCustomConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.http.HttpHeaders;
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java
deleted file mode 100644
index c4bf1611..00000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.service.consumer;
-
-import java.net.URI;
-import java.util.function.Consumer;
-
-import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpStatus;
-import org.springframework.web.reactive.function.client.WebClient;
-import org.springframework.web.util.DefaultUriBuilderFactory;
-
-import reactor.core.publisher.Mono;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/26/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-public class DmaapConsumerReactiveHttpClient {
-
-    private WebClient webClient;
-    private final String dmaapHostName;
-    private final String dmaapProtocol;
-    private final Integer dmaapPortNumber;
-    private final String dmaapTopicName;
-    private final String consumerGroup;
-    private final String consumerId;
-    private final String contentType;
-
-    /**
-     * Constructor of DmaapConsumerReactiveHttpClient.
-     *
-     * @param consumerConfiguration - DMaaP consumer configuration object
-     */
-    public DmaapConsumerReactiveHttpClient(DmaapConsumerConfiguration consumerConfiguration) {
-        this.dmaapHostName = consumerConfiguration.dmaapHostName();
-        this.dmaapProtocol = consumerConfiguration.dmaapProtocol();
-        this.dmaapPortNumber = consumerConfiguration.dmaapPortNumber();
-        this.dmaapTopicName = consumerConfiguration.dmaapTopicName();
-        this.consumerGroup = consumerConfiguration.consumerGroup();
-        this.consumerId = consumerConfiguration.consumerId();
-        this.contentType = consumerConfiguration.dmaapContentType();
-    }
-
-    /**
-     * Function for calling DMaaP HTTP consumer - consuming messages from Kafka/DMaaP from topic.
-     *
-     * @return reactive response from DMaaP in string format
-     */
-    public Mono<String> getDmaapConsumerResponse() {
-        return webClient.get().uri(getUri()).headers(getHeaders()).retrieve()
-                .onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new Exception("HTTP 400")))
-                .onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new Exception("HTTP 500")))
-                .bodyToMono(String.class);
-    }
-
-    private Consumer<HttpHeaders> getHeaders() {
-        return httpHeaders -> httpHeaders.set(HttpHeaders.CONTENT_TYPE, contentType);
-    }
-
-    private String createRequestPath() {
-        return dmaapTopicName + "/" + consumerGroup + "/" + consumerId;
-    }
-
-    public DmaapConsumerReactiveHttpClient createDmaapWebClient(WebClient webClient) {
-        this.webClient = webClient;
-        return this;
-    }
-
-    URI getUri() {
-        return new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber)
-                .path(createRequestPath()).build();
-    }
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
index 0e95b0b0..a4b37c58 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
@@ -36,13 +36,14 @@ import org.apache.http.entity.ByteArrayEntity;
 import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
 import org.apache.http.impl.nio.client.HttpAsyncClients;
 import org.apache.http.ssl.SSLContextBuilder;
-import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
+
 import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper;
 import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
 import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
 import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.http.HttpHeaders;
@@ -141,7 +142,7 @@ public class DmaapProducerReactiveHttpClient {
 
     private void prepareHead(ConsumerDmaapModel model, HttpPut put) {
         put.addHeader(HttpHeaders.CONTENT_TYPE, dmaapContentType);
-        JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
+        JsonElement metaData = new JsonParser().parse(new CommonFunctions().createJsonBody(model));
         String name = metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
         metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
         put.addHeader(X_ATT_DR_META, metaData.toString());
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClientTest.java
index 2eac5899..128f78f5 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClientTest.java
@@ -25,7 +25,9 @@ import static org.mockito.Mockito.when;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mock;
-import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
+
+
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
 import org.springframework.web.reactive.function.client.WebClient;
 
 class DmaapReactiveWebClientTest {
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/config/DmaapConsumerConfigurationTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/config/DmaapConsumerConfigurationTest.java
deleted file mode 100644
index b67946b2..00000000
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/config/DmaapConsumerConfigurationTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.service.config;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapConsumerConfiguration;
-
-public class DmaapConsumerConfigurationTest {
-
-    @Test
-    public void builder_shouldBuildConfigurationObject() {
-
-        // Given
-        DmaapConsumerConfiguration configuration;
-        String consumerId = "1";
-        String dmaapHostName = "localhost";
-        Integer dmaapPortNumber = 2222;
-        String dmaapTopicName = "temp";
-        String dmaapProtocol = "http";
-        String dmaapUserName = "admin";
-        String dmaapUserPassword = "admin";
-        String dmaapContentType = "application/json";
-        String consumerGroup = "other";
-        Integer timeoutMs = 1000;
-        Integer messageLimit = 1000;
-
-        // When
-        configuration = new ImmutableDmaapConsumerConfiguration.Builder().consumerId(consumerId)
-                .dmaapHostName(dmaapHostName).dmaapPortNumber(dmaapPortNumber).dmaapTopicName(dmaapTopicName)
-                .dmaapProtocol(dmaapProtocol).dmaapUserName(dmaapUserName).dmaapUserPassword(dmaapUserPassword)
-                .dmaapContentType(dmaapContentType).consumerGroup(consumerGroup).timeoutMS(timeoutMs)
-                .messageLimit(messageLimit).build();
-
-        // Then
-        Assertions.assertNotNull(configuration);
-        Assertions.assertEquals(consumerId, configuration.consumerId());
-        Assertions.assertEquals(dmaapHostName, configuration.dmaapHostName());
-        Assertions.assertEquals(dmaapPortNumber, configuration.dmaapPortNumber());
-        Assertions.assertEquals(dmaapTopicName, configuration.dmaapTopicName());
-        Assertions.assertEquals(dmaapProtocol, configuration.dmaapProtocol());
-        Assertions.assertEquals(dmaapUserName, configuration.dmaapUserName());
-        Assertions.assertEquals(dmaapUserPassword, configuration.dmaapUserPassword());
-        Assertions.assertEquals(consumerGroup, configuration.consumerGroup());
-        Assertions.assertEquals(timeoutMs, configuration.timeoutMS());
-        Assertions.assertEquals(messageLimit, configuration.messageLimit());
-    }
-}
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/config/DmaapPublisherConfigurationTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/config/DmaapPublisherConfigurationTest.java
deleted file mode 100644
index fb8e8751..00000000
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/config/DmaapPublisherConfigurationTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.service.config;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapPublisherConfiguration;
-
-public class DmaapPublisherConfigurationTest {
-
-
-    @Test
-    public void builder_shouldBuildConfigurationObject() {
-
-        // Given
-        DmaapPublisherConfiguration configuration;
-        String dmaapHostName = "localhost";
-        Integer dmaapPortNumber = 2222;
-        String dmaapTopicName = "temp";
-        String dmaapProtocol = "http";
-        String dmaapUserName = "admin";
-        String dmaapUserPassword = "admin";
-        String dmaapContentType = "application/json";
-
-        // When
-        configuration = new ImmutableDmaapPublisherConfiguration.Builder().dmaapHostName(dmaapHostName)
-                .dmaapPortNumber(dmaapPortNumber).dmaapTopicName(dmaapTopicName).dmaapProtocol(dmaapProtocol)
-                .dmaapUserName(dmaapUserName).dmaapUserPassword(dmaapUserPassword).dmaapContentType(dmaapContentType)
-                .build();
-
-        // Then
-        Assertions.assertNotNull(configuration);
-        Assertions.assertEquals(dmaapHostName, configuration.dmaapHostName());
-        Assertions.assertEquals(dmaapPortNumber, configuration.dmaapPortNumber());
-        Assertions.assertEquals(dmaapTopicName, configuration.dmaapTopicName());
-        Assertions.assertEquals(dmaapProtocol, configuration.dmaapProtocol());
-        Assertions.assertEquals(dmaapUserName, configuration.dmaapUserName());
-        Assertions.assertEquals(dmaapUserPassword, configuration.dmaapUserPassword());
-    }
-}
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClientTest.java
deleted file mode 100644
index 4568bdde..00000000
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClientTest.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.service.consumer;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import org.apache.http.HttpHeaders;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
-import org.springframework.web.reactive.function.client.WebClient;
-import org.springframework.web.reactive.function.client.WebClient.RequestHeadersUriSpec;
-import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
-
-import reactor.core.publisher.Mono;
-import reactor.test.StepVerifier;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/27/18
- */
-class DmaapConsumerReactiveHttpClientTest {
-
-    private DmaapConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient;
-
-    private DmaapConsumerConfiguration consumerConfigurationMock = mock(DmaapConsumerConfiguration.class);
-    private static final String JSON_MESSAGE = "{ \"responseFromDmaap\": \"Success\"}";
-    private Mono<String> expectedResult = Mono.empty();
-    private WebClient webClient;
-    private RequestHeadersUriSpec requestHeadersSpecMock;
-    private ResponseSpec responseSpecMock;
-
-
-    @BeforeEach
-    void setUp() {
-        when(consumerConfigurationMock.dmaapHostName()).thenReturn("54.45.33.2");
-        when(consumerConfigurationMock.dmaapProtocol()).thenReturn("https");
-        when(consumerConfigurationMock.dmaapPortNumber()).thenReturn(1234);
-        when(consumerConfigurationMock.dmaapUserName()).thenReturn("DATAFILE");
-        when(consumerConfigurationMock.dmaapUserPassword()).thenReturn("DATFILE");
-        when(consumerConfigurationMock.dmaapContentType()).thenReturn("application/json");
-        when(consumerConfigurationMock.dmaapTopicName()).thenReturn("unauthenticated.VES_NOTIFICATION_OUTPUT");
-        when(consumerConfigurationMock.consumerGroup()).thenReturn("OpenDCAE-c12");
-        when(consumerConfigurationMock.consumerId()).thenReturn("c12");
-
-        dmaapConsumerReactiveHttpClient = new DmaapConsumerReactiveHttpClient(consumerConfigurationMock);
-        webClient = spy(WebClient.builder()
-            .defaultHeader(HttpHeaders.CONTENT_TYPE, consumerConfigurationMock.dmaapContentType())
-            .filter(basicAuthentication(consumerConfigurationMock.dmaapUserName(),
-                consumerConfigurationMock.dmaapUserPassword()))
-            .build());
-        requestHeadersSpecMock = mock(RequestHeadersUriSpec.class);
-        responseSpecMock = mock(ResponseSpec.class);
-    }
-
-
-    @Test
-    void getHttpResponse_Success() {
-        //given
-        expectedResult = Mono.just(JSON_MESSAGE);
-
-        //when
-        mockDependantObjects();
-        doReturn(expectedResult).when(responseSpecMock).bodyToMono(String.class);
-        dmaapConsumerReactiveHttpClient.createDmaapWebClient(webClient);
-
-        Mono<String> response = dmaapConsumerReactiveHttpClient.getDmaapConsumerResponse();
-
-        //then
-        StepVerifier.create(response).expectSubscription()
-            .expectNextMatches(results -> {
-                Assertions.assertEquals(results, expectedResult.block());
-                return true;
-            }).verifyComplete();
-    }
-
-    @Test
-    void getAppropriateUri_whenPassingCorrectedUriData() throws URISyntaxException {
-        Assertions.assertEquals(dmaapConsumerReactiveHttpClient.getUri(),
-            URI.create("https://54.45.33.2:1234/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDCAE-c12/c12"));
-    }
-
-    private void mockDependantObjects() {
-        when(webClient.get()).thenReturn(requestHeadersSpecMock);
-        when(requestHeadersSpecMock.uri((URI) any())).thenReturn(requestHeadersSpecMock);
-        when(requestHeadersSpecMock.headers(any())).thenReturn(requestHeadersSpecMock);
-        when(requestHeadersSpecMock.retrieve()).thenReturn(responseSpecMock);
-        doReturn(responseSpecMock).when(responseSpecMock).onStatus(any(), any());
-    }
-
-}
\ No newline at end of file
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
index bf2f73d6..3fbd57cc 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
@@ -41,12 +41,13 @@ import org.apache.http.entity.ByteArrayEntity;
 import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
+
 import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
 import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
 import org.springframework.http.HttpHeaders;
 import org.springframework.web.util.DefaultUriBuilderFactory;
 
@@ -127,7 +128,7 @@ class DmaapProducerReactiveHttpClientTest {
         HttpPut httpPut = new HttpPut();
         httpPut.addHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_OCTET_STREAM_CONTENT_TYPE);
 
-        JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(consumerDmaapModel));
+        JsonElement metaData = new JsonParser().parse(new CommonFunctions().createJsonBody(consumerDmaapModel));
         metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
         metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
         httpPut.addHeader(X_ATT_DR_META, metaData.toString());
diff --git a/pom.xml b/pom.xml
index ecf1add7..9bae86b9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <groupId>org.onap.oparent</groupId>
     <artifactId>oparent</artifactId>
-    <version>1.2.0</version>
+    <version>1.2.1</version>
     <relativePath />
   </parent>
 
@@ -51,7 +51,7 @@
     <tomcat.version>8.5.34</tomcat.version>
     <docker.maven.version>1.0.0</docker.maven.version>
     <resource.maven.plugin.version>3.1.0</resource.maven.plugin.version>
-    <sdk.version>1.1.0-SNAPSHOT</sdk.version>
+    <sdk.version>1.1.1-SNAPSHOT</sdk.version>
 
     <!-- LOGGING SETTINGS -->
     <slf4j.version>1.7.25</slf4j.version>
@@ -130,6 +130,13 @@
             <skip>true</skip>
           </configuration>
         </plugin>
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-enforcer-plugin</artifactId>
+          <configuration>
+            <skip>true</skip>
+          </configuration>
+        </plugin>
       </plugins>
     </pluginManagement>
   </build>
@@ -141,6 +148,16 @@
         <artifactId>cbs-client</artifactId>
         <version>${sdk.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+        <artifactId>dmaap-client</artifactId>
+        <version>${sdk.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+        <artifactId>common-dependency</artifactId>
+        <version>${sdk.version}</version>
+      </dependency>
       <dependency>
         <groupId>org.asynchttpclient</groupId>
         <artifactId>async-http-client</artifactId>
-- 
cgit 1.2.3-korg