aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--README.md34
-rw-r--r--pom.xml16
-rw-r--r--sdc-distribution-ci/etc/sdc-client.jks (renamed from sdc-distribution-ci/etc/asdc-client.jks)bin1177 -> 1177 bytes
-rw-r--r--sdc-distribution-ci/etc/sdcclientstore.jks (renamed from sdc-distribution-ci/etc/asdcclientstore.jks)bin907 -> 907 bytes
-rw-r--r--sdc-distribution-ci/pom.xml22
-rw-r--r--sdc-distribution-ci/src/main/java/org/onap/test/core/config/DistributionClientConfig.java75
-rw-r--r--sdc-distribution-ci/src/main/java/org/onap/test/core/service/ClientNotifyCallback.java6
-rw-r--r--sdc-distribution-ci/src/main/java/org/onap/test/it/RegisterToSdcTopicIT.java (renamed from sdc-distribution-ci/src/main/java/org/onap/test/it/RegisterToAsdcTopicIT.java)19
-rw-r--r--sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java131
-rw-r--r--sdc-distribution-ci/src/test/java/org/onap/test/core/service/CustomKafkaContainer.java94
-rw-r--r--sdc-distribution-ci/src/test/resources/logback-test.xml14
-rw-r--r--sdc-distribution-client/etc/sdc-client.jks (renamed from sdc-distribution-client/etc/asdc-client.jks)bin1177 -> 1177 bytes
-rw-r--r--sdc-distribution-client/etc/sdcclientstore.jks (renamed from sdc-distribution-client/etc/asdcclientstore.jks)bin907 -> 907 bytes
-rw-r--r--sdc-distribution-client/pom.xml81
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/api/IDistributionClient.java21
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/api/asdc/RegistrationRequest.java56
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/api/consumer/IConfiguration.java50
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/api/notification/INotificationData.java4
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/api/notification/IVfModuleMetadata.java2
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpClientFactory.java2
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpSdcClient.java (renamed from sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpAsdcClient.java)42
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpSdcClientException.java (renamed from sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpAsdcClientException.java)4
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpSdcResponse.java (renamed from sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpAsdcResponse.java)6
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/http/IHttpSdcClient.java (renamed from sdc-distribution-client/src/main/java/org/onap/sdc/http/IHttpAsdcClient.java)6
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/http/SdcConnectorClient.java222
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/http/SdcUrls.java (renamed from sdc-distribution-client/src/main/java/org/onap/sdc/api/asdc/ServerListResponse.java)17
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/http/TopicRegistrationResponse.java43
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java84
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/impl/ConfigurationValidator.java30
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java482
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientResultImpl.java4
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java72
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/impl/StatusConsumer.java29
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionActionResultEnum.java22
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionClientConstants.java2
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/utils/GeneralUtils.java20
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java43
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/KafkaDataResponse.java (renamed from sdc-distribution-client/src/main/java/org/onap/sdc/http/AsdcUrls.java)25
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java100
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaProducer.java98
-rw-r--r--sdc-distribution-client/src/test/java/org/onap/sdc/api/asdc/RegistrationRequestTest.java45
-rw-r--r--sdc-distribution-client/src/test/java/org/onap/sdc/http/HttpAsdcClientResponseTest.java9
-rw-r--r--sdc-distribution-client/src/test/java/org/onap/sdc/http/HttpAsdcClientTest.java35
-rw-r--r--sdc-distribution-client/src/test/java/org/onap/sdc/http/HttpClientFactoryTest.java6
-rw-r--r--sdc-distribution-client/src/test/java/org/onap/sdc/http/SdcConnectorClientTest.java263
-rw-r--r--sdc-distribution-client/src/test/java/org/onap/sdc/impl/DistributionClientTest.java325
-rw-r--r--sdc-distribution-client/src/test/java/org/onap/sdc/impl/NotificationConsumerTest.java656
-rw-r--r--sdc-distribution-client/src/test/java/org/onap/sdc/utils/NotificationSenderTest.java94
-rw-r--r--sdc-distribution-client/src/test/java/org/onap/sdc/utils/SdcKafkaTest.java104
-rw-r--r--sdc-distribution-client/src/test/java/org/onap/sdc/utils/TestConfiguration.java98
-rw-r--r--sdc-distribution-client/src/test/resources/jaas.conf20
-rw-r--r--sdc-distribution-client/src/test/resources/log4j.xml26
-rw-r--r--sdc-distribution-client/src/test/resources/sdc-client.jks (renamed from sdc-distribution-client/src/test/resources/asdc-client.jks)bin1177 -> 1177 bytes
-rw-r--r--version.properties6
54 files changed, 1739 insertions, 1926 deletions
diff --git a/README.md b/README.md
index 8a22140..f975823 100644
--- a/README.md
+++ b/README.md
@@ -23,18 +23,18 @@ Every client that wants to use the JAR, need to implement IConfiguration interfa
Configuration parameters:
--------------------------
-- AsdcAddress : ASDC Distribution Engine address. Value can be either hostname (with or without port), IP:port or FQDN (Fully Qualified Domain Name).
-- User : User Name for ASDC distribution consumer authentication.
-- Password : User Password for ASDC distribution consumer authentication.
-- PollingInterval : Distribution Client Polling Interval towards UEB in seconds. Can Be reconfigured in runtime.
-- PollingTimeout : Distribution Client Timeout in seconds waiting to UEB server response in each fetch interval. Can Be reconfigured in runtime.
+- sdcAddress : SDC Distribution Engine address. Value can be either hostname (with or without port), IP:port or FQDN (Fully Qualified Domain Name).
+- User : User Name for SDC distribution consumer authentication.
+- Password : User Password for SDC distribution consumer authentication.
+- PollingInterval : Distribution Client Polling Interval towards MessageBus in seconds. Can Be reconfigured in runtime.
+- PollingTimeout : Distribution Client Timeout in seconds waiting to MessageBus server response in each fetch interval. Can Be reconfigured in runtime.
- RelevantArtifactTypes : List of artifact types. If the service contains any of the artifacts in the list, the callback will be activated. Can Be reconfigured in runtime.
- ConsumerGroup : Returns the consumer group defined for this ONAP component, if no consumer group is defined return null.
-- EnvironmentName : Returns the environment name (testing, production etc... Can Be reconfigured in runtime.
-- ConsumerID : Unique ID of ONAP component instance (e.x INSTAR name).
-- KeyStorePath : Return full path to Client's Key Store that contains either CA certificate or the ASDC's public key (e.g /etc/keystore/asdc-client.jks). file will be deployed with asdc-distribution jar
+- EnvironmentName : Returns the environment name (testing, production etc... Can Be reconfigured in runtime.
+- ConsumerID : Unique ID of ONAP component instance (e.x INSTAR name).
+- KeyStorePath : Return full path to Client's Key Store that contains either CA certificate or the SDC's public key (e.g /etc/keystore/sdc-client.jks). file will be deployed with sdc-distribution jar
- KeyStorePassword : Return client's Key Store password.
-- activateServerTLSAuth : Sets whether ASDC server TLS authentication is activated. If set to false, Key Store path and password are not needed to be set.
+- activateServerTLSAuth : Sets whether SDC server TLS authentication is activated. If set to false, Key Store path and password are not needed to be set.
- UseSystemProxy : If set to true, SDC Distribution Client will use system wide proxy configuration passed through JVM arguments.
- HttpProxyHost : Optional config. If configured, SDC Distribution client will use this http proxy host with HTTP client.
- HttpProxyPort : Mandatory if HttpProxyHost is configured. If configured, SDC Distribution client will use this https proxy port with HTTP client.
@@ -48,16 +48,16 @@ package org.onap.conf;
import java.util.ArrayList;
import java.util.List;
-import org.onap.asdc.api.consumer.IConfiguration;
-import org.onap.asdc.utils.ArtifactTypeEnum;
+import org.onap.sdc.api.consumer.IConfiguration;
+import org.onap.sdc.utils.ArtifactTypeEnum;
public class SimpleConfiguration implements IConfiguration{
int randomSeed;
- String asdcAddress;
+ String sdcAddress;
public SimpleConfiguration(){
randomSeed = ((int)(Math.random()*1000));
- asdcAddress = "127.0.0.1:8443";
+ sdcAddress = "127.0.0.1:8443";
}
public String getUser() {
return "ci";
@@ -95,12 +95,12 @@ public class SimpleConfiguration implements IConfiguration{
return "unique-Consumer-Group"+randomSeed;
}
- public String getAsdcAddress() {
- return asdcAddress;
+ public String getSdcAddress() {
+ return sdcAddress;
}
- public void setAsdcAddress(String asdcAddress) {
- this.asdcAddress = asdcAddress;
+ public void setSdcAddress(String sdcAddress) {
+ this.sdcAddress = sdcAddress;
}
@Override
public String getKeyStorePath() {
diff --git a/pom.xml b/pom.xml
index 9defce1..bb52ff0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
<groupId>org.onap.sdc.sdc-distribution-client</groupId>
<artifactId>sdc-main-distribution-client</artifactId>
- <version>1.4.5-SNAPSHOT</version>
+ <version>2.0.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>sdc-sdc-distribution-client</name>
@@ -21,7 +21,6 @@
</modules>
<properties>
-
<!-- ==================== -->
<!-- Generic properties -->
<!-- ==================== -->
@@ -38,7 +37,7 @@
<httpclient.version>4.5.13</httpclient.version>
<httpcore.version>4.4.15</httpcore.version>
<logback.version>1.2.11</logback.version>
- <junit.version>5.9.0</junit.version>
+ <junit.version>5.7.2</junit.version>
<snakeyaml.version>1.30</snakeyaml.version>
<guava.version>31.1-jre</guava.version>
<jetty.version>9.4.48.v20220622</jetty.version>
@@ -71,7 +70,6 @@
<maven-site-plugin.version>3.9.1</maven-site-plugin.version>
<wagon-webdav-jackrabbit.version>3.4.2</wagon-webdav-jackrabbit.version>
<jacoco-maven-plugin.version>0.8.6</jacoco-maven-plugin.version>
- <maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version>
<maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
<maven-compiler-plugin.source>11</maven-compiler-plugin.source>
<maven-compiler-plugin.target>11</maven-compiler-plugin.target>
@@ -129,7 +127,6 @@
<artifactId>umlgraph</artifactId>
<version>${umlgraph.version}</version>
</docletArtifact>
- <additionalparam>-views</additionalparam>
<useStandardDocletOptions>true</useStandardDocletOptions>
</configuration>
</plugin>
@@ -140,7 +137,14 @@
<plugins>
<plugin>
<artifactId>maven-checkstyle-plugin</artifactId>
- <version>2.17</version>
+ <executions>
+ <execution>
+ <id>onap-java-style</id>
+ <configuration>
+ <consoleOutput>false</consoleOutput>
+ </configuration>
+ </execution>
+ </executions>
<configuration>
<suppressionsLocation>checkstyle-suppressions.xml</suppressionsLocation>
<suppressionsFileExpression>checkstyle.suppressions.file</suppressionsFileExpression>
diff --git a/sdc-distribution-ci/etc/asdc-client.jks b/sdc-distribution-ci/etc/sdc-client.jks
index eb0a0d3..eb0a0d3 100644
--- a/sdc-distribution-ci/etc/asdc-client.jks
+++ b/sdc-distribution-ci/etc/sdc-client.jks
Binary files differ
diff --git a/sdc-distribution-ci/etc/asdcclientstore.jks b/sdc-distribution-ci/etc/sdcclientstore.jks
index 5dc006d..5dc006d 100644
--- a/sdc-distribution-ci/etc/asdcclientstore.jks
+++ b/sdc-distribution-ci/etc/sdcclientstore.jks
Binary files differ
diff --git a/sdc-distribution-ci/pom.xml b/sdc-distribution-ci/pom.xml
index e11d428..707de83 100644
--- a/sdc-distribution-ci/pom.xml
+++ b/sdc-distribution-ci/pom.xml
@@ -7,7 +7,7 @@
<parent>
<groupId>org.onap.sdc.sdc-distribution-client</groupId>
<artifactId>sdc-main-distribution-client</artifactId>
- <version>1.4.5-SNAPSHOT</version>
+ <version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>sdc-distribution-ci</artifactId>
@@ -25,6 +25,13 @@
</properties>
<dependencies>
+ <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>kafka</artifactId>
+ <version>${testcontainers.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.onap.sdc.sdc-distribution-client</groupId>
<artifactId>sdc-distribution-client</artifactId>
@@ -136,6 +143,11 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ <version>2.8.2</version>
+ </dependency>
+ <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito-core.version}</version>
@@ -181,6 +193,12 @@
<version>${httpclient.version}</version>
<scope>runtime</scope>
</dependency>
+ <dependency>
+ <groupId>org.junit-pioneer</groupId>
+ <artifactId>junit-pioneer</artifactId>
+ <version>1.4.2</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
@@ -195,7 +213,7 @@
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib</classpathPrefix>
- <mainClass>org.onap.test.it.RegisterToAsdcTopicIT</mainClass>
+ <mainClass>org.onap.test.it.RegisterToSdcTopicIT</mainClass>
</manifest>
<manifestEntries>
<Class-Path>lib/</Class-Path>
diff --git a/sdc-distribution-ci/src/main/java/org/onap/test/core/config/DistributionClientConfig.java b/sdc-distribution-ci/src/main/java/org/onap/test/core/config/DistributionClientConfig.java
index ad18969..5a902c8 100644
--- a/sdc-distribution-ci/src/main/java/org/onap/test/core/config/DistributionClientConfig.java
+++ b/sdc-distribution-ci/src/main/java/org/onap/test/core/config/DistributionClientConfig.java
@@ -26,21 +26,23 @@ import java.util.List;
public class DistributionClientConfig implements IConfiguration {
- public static final String DEFAULT_ASDC_ADDRESS = "localhost:30206";
+ public static final String DEFAULT_SDC_ADDRESS = "localhost:30206";
public static final String DEFAULT_COMSUMER_ID = "dcae-openapi-manager";
public static final String DEFAULT_CONSUMER_GROUP = "noapp";
public static final String DEFAULT_ENVIRONMENT_NAME = "AUTO";
public static final String DEFAULT_PASSWORD = "Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U";
public static final int DEFAULT_POLLING_INTERVAL = 20;
public static final int DEFAULT_POLLING_TIMEOUT = 20;
+ public static final String DEFAULT_STATUS_TOPIC = "STATUS-TOPIC";
+ public static final String DEFAULT_NOTIF_TOPIC = "NOTIF-TOPIC";
public static final String DEFAULT_USER = "dcae";
- public static final String DEFAULT_KEY_STORE_PATH = "etc/asdc-client.jks";
+ public static final String DEFAULT_KEY_STORE_PATH = "etc/sdc-client.jks";
public static final String DEFAULT_KEY_STORE_PASSWORD = "Aa123456";
public static final boolean DEFAULT_ACTIVATE_SERVER_TLS_AUTH = false;
public static final boolean DEFAULT_IS_FILTER_IN_EMPTY_RESOURCES = true;
public static final boolean DEFAULT_USE_HTTPS_WITH_SDC = false;
- public static final String DEFAULT_MSG_BUS_ADDRESS = "localhost";
- private String asdcAddress;
+ public static final String DEFAULT_MSG_BUS_ADDRESS = "localhost:9092";
+ private String sdcAddress;
private String user;
private String password;
private int pollingInterval;
@@ -53,38 +55,23 @@ public class DistributionClientConfig implements IConfiguration {
private String keyStorePassword;
private boolean activateServerTLSAuth;
private boolean isFilterInEmptyResources;
- private boolean useHttpsWithDmaap;
private boolean useHttpsWithSDC;
private List<String> msgBusAddress;
+ private String sdcStatusTopicName;
+ private String sdcNotificationTopicName;
+ private String kafkaSecurityProtocolConfig;
+ private String kafkaSaslMechanism;
+ private String kafkaSaslJaasConfig;
private String httpProxyHost;
private int httpProxyPort;
private String httpsProxyHost;
private int httpsProxyPort;
private boolean useSystemProxy;
- public DistributionClientConfig(IConfiguration other) {
- this.asdcAddress = other.getAsdcAddress();
- this.comsumerID = other.getConsumerID();
- this.consumerGroup = other.getConsumerGroup();
- this.environmentName = other.getEnvironmentName();
- this.password = other.getPassword();
- this.pollingInterval = other.getPollingInterval();
- this.pollingTimeout = other.getPollingTimeout();
- this.relevantArtifactTypes = other.getRelevantArtifactTypes();
- this.user = other.getUser();
- this.keyStorePath = other.getKeyStorePath();
- this.keyStorePassword = other.getKeyStorePassword();
- this.activateServerTLSAuth = other.activateServerTLSAuth();
- this.isFilterInEmptyResources = other.isFilterInEmptyResources();
- this.httpProxyHost = other.getHttpProxyHost();
- this.httpProxyPort = other.getHttpProxyPort();
- this.httpsProxyHost = other.getHttpsProxyHost();
- this.httpsProxyPort = other.getHttpsProxyPort();
- this.useSystemProxy = other.isUseSystemProxy();
- }
-
public DistributionClientConfig() {
- this.asdcAddress = DEFAULT_ASDC_ADDRESS;
+ this.sdcAddress = DEFAULT_SDC_ADDRESS;
+ this.sdcStatusTopicName = DEFAULT_STATUS_TOPIC;
+ this.sdcNotificationTopicName = DEFAULT_NOTIF_TOPIC;
this.comsumerID = DEFAULT_COMSUMER_ID;
this.consumerGroup = DEFAULT_CONSUMER_GROUP;
this.environmentName = DEFAULT_ENVIRONMENT_NAME;
@@ -99,22 +86,23 @@ public class DistributionClientConfig implements IConfiguration {
this.activateServerTLSAuth = DEFAULT_ACTIVATE_SERVER_TLS_AUTH;
this.isFilterInEmptyResources = DEFAULT_IS_FILTER_IN_EMPTY_RESOURCES;
this.useHttpsWithSDC = DEFAULT_USE_HTTPS_WITH_SDC;
- msgBusAddress = new ArrayList<>();
- msgBusAddress.add(DEFAULT_MSG_BUS_ADDRESS);
- msgBusAddress.add(DEFAULT_MSG_BUS_ADDRESS);
- msgBusAddress.add(DEFAULT_MSG_BUS_ADDRESS);
+ this.msgBusAddress = new ArrayList<>();
+ this.msgBusAddress.add(DEFAULT_MSG_BUS_ADDRESS);
}
@Override
- public String getAsdcAddress() {
- return asdcAddress;
+ public String getSdcAddress() {
+ return sdcAddress;
}
- @Override
public List<String> getMsgBusAddress() {
return msgBusAddress;
}
+ public void setMsgBusAddress(List<String> msgBusAddress) {
+ this.msgBusAddress = msgBusAddress;
+ }
+
@Override
public String getUser() {
return user;
@@ -173,8 +161,8 @@ public class DistributionClientConfig implements IConfiguration {
this.comsumerID = comsumerID;
}
- public void setAsdcAddress(String asdcAddress) {
- this.asdcAddress = asdcAddress;
+ public void setSdcAddress(String sdcAddress) {
+ this.sdcAddress = sdcAddress;
}
public void setUser(String user) {
@@ -217,7 +205,7 @@ public class DistributionClientConfig implements IConfiguration {
public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + ((asdcAddress == null) ? 0 : asdcAddress.hashCode());
+ result = prime * result + ((sdcAddress == null) ? 0 : sdcAddress.hashCode());
result = prime * result + ((comsumerID == null) ? 0 : comsumerID.hashCode());
result = prime * result + ((consumerGroup == null) ? 0 : consumerGroup.hashCode());
result = prime * result + ((environmentName == null) ? 0 : environmentName.hashCode());
@@ -251,11 +239,11 @@ public class DistributionClientConfig implements IConfiguration {
return false;
}
DistributionClientConfig other = (DistributionClientConfig) obj;
- if (asdcAddress == null) {
- if (other.asdcAddress != null) {
+ if (sdcAddress == null) {
+ if (other.sdcAddress != null) {
return false;
}
- } else if (!asdcAddress.equals(other.asdcAddress)) {
+ } else if (!sdcAddress.equals(other.sdcAddress)) {
return false;
}
if (comsumerID == null) {
@@ -322,7 +310,7 @@ public class DistributionClientConfig implements IConfiguration {
@Override
public String toString() {
- return "TestConfiguration [asdcAddress=" + asdcAddress + ", user=" + user + ", password=" + password
+ return "TestConfiguration [sdcAddress=" + sdcAddress + ", user=" + user + ", password=" + password
+ ", pollingInterval=" + pollingInterval + ", pollingTimeout=" + pollingTimeout
+ ", relevantArtifactTypes=" + relevantArtifactTypes + ", consumerGroup=" + consumerGroup
+ ", environmentName=" + environmentName + ", comsumerID=" + comsumerID + "]";
@@ -337,11 +325,6 @@ public class DistributionClientConfig implements IConfiguration {
this.isFilterInEmptyResources = isFilterInEmptyResources;
}
- @Override
- public Boolean isUseHttpsWithDmaap() {
- return this.useHttpsWithDmaap;
- }
-
public Boolean isUseHttpsWithSDC() {
return this.useHttpsWithSDC;
}
diff --git a/sdc-distribution-ci/src/main/java/org/onap/test/core/service/ClientNotifyCallback.java b/sdc-distribution-ci/src/main/java/org/onap/test/core/service/ClientNotifyCallback.java
index 4dfe388..4cee4cf 100644
--- a/sdc-distribution-ci/src/main/java/org/onap/test/core/service/ClientNotifyCallback.java
+++ b/sdc-distribution-ci/src/main/java/org/onap/test/core/service/ClientNotifyCallback.java
@@ -23,7 +23,7 @@ import org.onap.sdc.api.consumer.IDistributionStatusMessage;
import org.onap.sdc.api.consumer.INotificationCallback;
import org.onap.sdc.api.notification.INotificationData;
import org.onap.sdc.api.notification.IResourceInstance;
-import org.onap.sdc.http.HttpAsdcClient;
+import org.onap.sdc.http.HttpSdcClient;
import org.onap.sdc.http.SdcConnectorClient;
import org.onap.sdc.impl.DistributionClientDownloadResultImpl;
import org.onap.sdc.impl.DistributionClientImpl;
@@ -46,8 +46,8 @@ public class ClientNotifyCallback implements INotificationCallback {
private final DistributionClientImpl distributionClient;
private final List<DistributionClientDownloadResultImpl> pulledArtifacts = new ArrayList<>();
DistributionClientConfig config = new DistributionClientConfig();
- HttpAsdcClient asdcClient = new HttpAsdcClient(config);
- SdcConnectorClient sdcConnectorClient = new SdcConnectorClient(config, asdcClient);
+ HttpSdcClient sdcClient = new HttpSdcClient(config);
+ SdcConnectorClient sdcConnectorClient = new SdcConnectorClient(config, sdcClient);
ArtifactsDownloader artifactsDownloader = new ArtifactsDownloader("/app/path", sdcConnectorClient);
public List<DistributionClientDownloadResultImpl> getPulledArtifacts() {
diff --git a/sdc-distribution-ci/src/main/java/org/onap/test/it/RegisterToAsdcTopicIT.java b/sdc-distribution-ci/src/main/java/org/onap/test/it/RegisterToSdcTopicIT.java
index 58baec7..c89ecd4 100644
--- a/sdc-distribution-ci/src/main/java/org/onap/test/it/RegisterToAsdcTopicIT.java
+++ b/sdc-distribution-ci/src/main/java/org/onap/test/it/RegisterToSdcTopicIT.java
@@ -19,18 +19,15 @@
*/
package org.onap.test.it;
+import java.util.ArrayList;
+import java.util.List;
import org.onap.sdc.impl.DistributionClientImpl;
import org.onap.test.core.config.DistributionClientConfig;
import org.onap.test.core.service.ArtifactsValidator;
import org.onap.test.core.service.ClientInitializer;
import org.onap.test.core.service.ClientNotifyCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-public class RegisterToAsdcTopicIT {
+public class RegisterToSdcTopicIT {
public static void main(String[] args) {
@@ -40,12 +37,10 @@ public class RegisterToAsdcTopicIT {
ClientNotifyCallback callback = new ClientNotifyCallback(validators, client);
ClientInitializer clientInitializer = new ClientInitializer(clientConfig, callback, client);
clientInitializer.initialize();
- Runtime.getRuntime().addShutdownHook(new Thread() {
- public void run() {
- client.stop();
- System.out.println("Shutdown Hook is running !");
- }
- });
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ client.stop();
+ System.out.println("Shutdown Hook is running !");
+ }));
}
}
diff --git a/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java b/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java
index ba118c0..1abef1f 100644
--- a/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java
+++ b/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java
@@ -24,18 +24,35 @@ import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.verify;
import java.io.IOException;
-import java.net.URI;
-import java.net.http.HttpClient;
-import java.net.http.HttpRequest;
-import java.net.http.HttpResponse;
-import java.nio.file.Paths;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.List;
-import org.awaitility.Durations;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import lombok.SneakyThrows;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.SaslConfigs;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junitpioneer.jupiter.SetEnvironmentVariable;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
@@ -46,84 +63,52 @@ import org.onap.test.core.config.DistributionClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.WaitStrategy;
+import org.testcontainers.containers.wait.strategy.WaitStrategyTarget;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
-
+import org.testcontainers.shaded.org.awaitility.Durations;
+import org.testcontainers.utility.DockerImageName;
@Testcontainers
@ExtendWith(MockitoExtension.class)
+@SetEnvironmentVariable(key = "SASL_JAAS_CONFIG", value = "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret';")
+@SetEnvironmentVariable(key = "SECURITY_PROTOCOL", value = "SASL_PLAINTEXT")
+@SetEnvironmentVariable(key = "SASL_MECHANISM", value = "PLAIN")
class ClientInitializerTest {
- private static final int SUCCESSFUL_STOP_MSG_INDEX = 2;
- private static final int SUCCESSFUL_UNREGISTER_MSG_INDEX = 3;
- private static final int SUCCESSFUL_INIT_MSG_INDEX = 0;
- private static final int SUCCESSFUL_DIST_MSG_INDEX = 3;
private static final int EXPECTED_HEAT_ARTIFACTS = 4;
+ private static final DistributionClientConfig clientConfig = new DistributionClientConfig();
private static final Logger testLog = LoggerFactory.getLogger(ClientInitializerTest.class);
@Container
- public GenericContainer mockDmaap = new GenericContainer("registry.gitlab.com/orange-opensource/lfn/onap/mock_servers/mock-dmaap:latest")
- .withNetworkMode("host");
+ CustomKafkaContainer kafka = buildBrokerInstance();
@Container
- public GenericContainer mockSdc = new GenericContainer("registry.gitlab.com/orange-opensource/lfn/onap/mock_servers/mock-sdc:latest")
- .withNetworkMode("host");
- @Mock
- private Logger log;
+ public GenericContainer<?> mockSdc =
+ new GenericContainer<>(
+ "registry.gitlab.com/orange-opensource/lfn/onap/mock_servers/mock-sdc:develop")
+ .withExposedPorts(30206);
@Mock
private Logger distClientLog;
private ClientInitializer clientInitializer;
private ClientNotifyCallback clientNotifyCallback;
@BeforeEach
- public void initializeClient() {
- DistributionClientConfig clientConfig = new DistributionClientConfig();
+ public void initializeClient() throws InterruptedException {
+ clientConfig.setSdcAddress(mockSdc.getHost()+":"+mockSdc.getFirstMappedPort());
List<ArtifactsValidator> validators = new ArrayList<>();
DistributionClientImpl client = new DistributionClientImpl(distClientLog);
clientNotifyCallback = new ClientNotifyCallback(validators, client);
clientInitializer = new ClientInitializer(clientConfig, clientNotifyCallback, client);
- }
-
- @Test
- void shouldRegisterToDmaapAfterClientInitialization() {
- //given
- final ArgumentCaptor<String> exceptionCaptor = ArgumentCaptor.forClass(String.class);
- //when
- clientInitializer.log = log;
- clientInitializer.initialize();
- verify(log, Mockito.atLeastOnce()).info(exceptionCaptor.capture());
- List<String> allValues = exceptionCaptor.getAllValues();
- //then
- assertThat(allValues.get(SUCCESSFUL_INIT_MSG_INDEX)).isEqualTo("distribution client initialized successfuly");
- assertThat(allValues.get(SUCCESSFUL_DIST_MSG_INDEX)).isEqualTo("distribution client started successfuly");
- }
-
- @Test
- void shouldUnregisterAndStopClient() {
- //given
- final ArgumentCaptor<String> exceptionCaptor = ArgumentCaptor.forClass(String.class);
- //when
clientInitializer.initialize();
- clientInitializer.stop();
- verify(distClientLog, Mockito.atLeastOnce()).info(exceptionCaptor.capture());
- List<String> allValues = exceptionCaptor.getAllValues();
- //then
- assertThat(allValues.get(SUCCESSFUL_STOP_MSG_INDEX)).isEqualTo("stop DistributionClient");
- assertThat(allValues.get(SUCCESSFUL_UNREGISTER_MSG_INDEX)).isEqualTo("client unregistered from topics successfully");
+ Thread.sleep(1000);
+ setUpTopicsAndSendData();
}
@Test
- void shouldDownloadArtifactsWithArtifactTypeHeat() throws IOException, InterruptedException {
-
- //given
- HttpRequest request = HttpRequest.newBuilder()
- .uri(URI.create("http://localhost:3904/events/testName/add"))
- .POST(HttpRequest.BodyPublishers.ofFile(Paths.get("src/test/resources/artifacts.json")))
- .build();
- HttpClient.newHttpClient().send(request, HttpResponse.BodyHandlers.ofString());
- //when
- clientInitializer.initialize();
+ void shouldDownloadArtifactsWithArtifactTypeHeat() {
waitForArtifacts();
List<DistributionClientDownloadResultImpl> calls = clientNotifyCallback.getPulledArtifacts();
- //then
Assertions.assertEquals(EXPECTED_HEAT_ARTIFACTS, calls.size());
+ clientInitializer.stop();
}
private void waitForArtifacts() {
@@ -132,4 +117,36 @@ class ClientInitializerTest {
.atMost(Durations.ONE_MINUTE)
.until(() -> !clientNotifyCallback.getPulledArtifacts().isEmpty());
}
+
+ private CustomKafkaContainer buildBrokerInstance() {
+ final Map<String, String> env = new LinkedHashMap<>();
+ env.put("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:SASL_PLAINTEXT");
+ env.put("KAFKA_SASL_ENABLED_MECHANISMS", "PLAIN");
+ env.put("KAFKA_LISTENER_NAME_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG", "org.apache.kafka.common.security.plain.PlainLoginModule required " +
+ "username=\"admin\" " +
+ "password=\"admin-secret\" " +
+ "user_admin=\"admin-secret\" " +
+ "user_producer=\"producer-secret\" " +
+ "user_consumer=\"consumer-secret\";");
+
+ return new CustomKafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
+ .withEmbeddedZookeeper()
+ .withStartupAttempts(1)
+ .withEnv(env)
+ .withFixedExposedPort(43219, 9093);
+ }
+
+ @SneakyThrows
+ private void setUpTopicsAndSendData() {
+ Properties props = new Properties();
+ props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+ props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
+ props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret';");
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ KafkaProducer<String, String> producer = new KafkaProducer<>(props);
+ String content = Files.readString(Path.of("src/test/resources/artifacts.json"));
+ producer.send(new ProducerRecord<>("SDC-DIST-NOTIF-TOPIC", "testcontainers", content)).get();
+ }
}
diff --git a/sdc-distribution-ci/src/test/java/org/onap/test/core/service/CustomKafkaContainer.java b/sdc-distribution-ci/src/test/java/org/onap/test/core/service/CustomKafkaContainer.java
new file mode 100644
index 0000000..e2eabc1
--- /dev/null
+++ b/sdc-distribution-ci/src/test/java/org/onap/test/core/service/CustomKafkaContainer.java
@@ -0,0 +1,94 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * sdc-distribution-client
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.test.core.service;
+
+import com.github.dockerjava.api.command.InspectContainerResponse;
+import lombok.SneakyThrows;
+import org.testcontainers.containers.FixedHostPortGenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+public class CustomKafkaContainer extends FixedHostPortGenericContainer<CustomKafkaContainer> {
+ protected String externalZookeeperConnect;
+
+ public CustomKafkaContainer(DockerImageName dockerImageName) {
+ super(String.valueOf(dockerImageName));
+ this.externalZookeeperConnect = null;
+ this.withExposedPorts(9093);
+ this.withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092");
+ this.withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
+ this.withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");
+ this.withEnv("KAFKA_BROKER_ID", "1");
+ this.withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1");
+ this.withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1");
+ this.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1");
+ this.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1");
+ this.withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", "9223372036854775807");
+ this.withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");
+ }
+
+ public CustomKafkaContainer withEmbeddedZookeeper() {
+ this.externalZookeeperConnect = null;
+ return this.self();
+ }
+
+ public String getBootstrapServers() {
+ return String.format("PLAINTEXT://%s:%s", this.getHost(), this.getMappedPort(9093));
+ }
+
+ protected void configure() {
+ this.withEnv("KAFKA_ADVERTISED_LISTENERS", String.format("BROKER://%s:9092", this.getNetwork() != null ? this.getNetworkAliases().get(0) : "localhost"));
+ String command = "";
+ if (this.externalZookeeperConnect != null) {
+ this.withEnv("KAFKA_ZOOKEEPER_CONNECT", this.externalZookeeperConnect);
+ } else {
+ this.addExposedPort(2181);
+ this.withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:2181");
+ command = command + "echo 'clientPort=2181' > zookeeper.properties\n";
+ command = command + "echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties\n";
+ command = command + "echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties\n";
+ command = command + "zookeeper-server-start zookeeper.properties &\n";
+ }
+
+ command = command + "echo '' > /etc/confluent/docker/ensure \n";
+ command = command + "/etc/confluent/docker/run \n";
+ this.withCommand("sh", "-c", command);
+ }
+
+ @SneakyThrows
+ protected void containerIsStarted(InspectContainerResponse containerInfo) {
+ try {
+ String brokerAdvertisedListener = this.brokerAdvertisedListener(containerInfo);
+ ExecResult result = this.execInContainer("kafka-configs", "--alter", "--bootstrap-server",
+ brokerAdvertisedListener, "--entity-type", "brokers", "--entity-name",
+ this.getEnvMap().get("KAFKA_BROKER_ID"), "--add-config",
+ "advertised.listeners=[" + String.join(",", this.getBootstrapServers(), brokerAdvertisedListener) + "]");
+ if (result.getExitCode() != 0) {
+ throw new IllegalStateException(result.toString());
+ }
+ } catch (Throwable var4) {
+ throw var4;
+ }
+ }
+
+ protected String brokerAdvertisedListener(InspectContainerResponse containerInfo) {
+ return String.format("BROKER://%s:%s", containerInfo.getConfig().getHostName(), "9092");
+ }
+}
diff --git a/sdc-distribution-ci/src/test/resources/logback-test.xml b/sdc-distribution-ci/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..78c5b05
--- /dev/null
+++ b/sdc-distribution-ci/src/test/resources/logback-test.xml
@@ -0,0 +1,14 @@
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="INFO">
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+ <logger name="org.testcontainers" level="INFO"/>
+ <logger name="com.github.dockerjava" level="WARN"/>
+</configuration> \ No newline at end of file
diff --git a/sdc-distribution-client/etc/asdc-client.jks b/sdc-distribution-client/etc/sdc-client.jks
index eb0a0d3..eb0a0d3 100644
--- a/sdc-distribution-client/etc/asdc-client.jks
+++ b/sdc-distribution-client/etc/sdc-client.jks
Binary files differ
diff --git a/sdc-distribution-client/etc/asdcclientstore.jks b/sdc-distribution-client/etc/sdcclientstore.jks
index 5dc006d..5dc006d 100644
--- a/sdc-distribution-client/etc/asdcclientstore.jks
+++ b/sdc-distribution-client/etc/sdcclientstore.jks
Binary files differ
diff --git a/sdc-distribution-client/pom.xml b/sdc-distribution-client/pom.xml
index ef0911c..3d1ebbd 100644
--- a/sdc-distribution-client/pom.xml
+++ b/sdc-distribution-client/pom.xml
@@ -3,17 +3,10 @@
<modelVersion>4.0.0</modelVersion>
- <properties>
- <commons-io.version>2.8.0</commons-io.version>
- <gson.version>2.8.9</gson.version>
- <cambriaClient.version>0.0.1</cambriaClient.version>
- <lombok.version>1.18.24</lombok.version>
- </properties>
-
<parent>
<groupId>org.onap.sdc.sdc-distribution-client</groupId>
<artifactId>sdc-main-distribution-client</artifactId>
- <version>1.4.5-SNAPSHOT</version>
+ <version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>sdc-distribution-client</artifactId>
@@ -21,7 +14,38 @@
<description>Distribution client JAR file to use by consumers</description>
<packaging>jar</packaging>
+ <properties>
+ <assertj-core.version>3.18.1</assertj-core.version>
+ <mockito-all.version>3.6.28</mockito-all.version>
+ <commons-io.version>2.8.0</commons-io.version>
+ <slf4j-api.version>1.7.30</slf4j-api.version>
+ <jackson.version>2.5.2</jackson.version>
+ <kafka.version>3.3.1</kafka.version>
+ <gson.version>2.8.9</gson.version>
+ <lombok.version>1.18.24</lombok.version>
+ </properties>
<dependencies>
+ <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
@@ -36,13 +60,6 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j-api.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.att.nsa</groupId>
- <artifactId>cambriaClient</artifactId>
- <version>${cambriaClient.version}</version>
- <scope>compile</scope>
<exclusions>
<exclusion>
<groupId>org.json</groupId>
@@ -128,7 +145,6 @@
</exclusions>
<version>${jetty.version}</version>
</dependency>
-
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-webapp</artifactId>
@@ -141,7 +157,6 @@
</exclusion>
</exclusions>
</dependency>
-
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
@@ -162,12 +177,17 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.junit-pioneer</groupId>
+ <artifactId>junit-pioneer</artifactId>
+ <version>1.4.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
-
<dependency>
<groupId>com.google.code.bean-matchers</groupId>
<artifactId>bean-matchers</artifactId>
@@ -180,26 +200,35 @@
</exclusion>
</exclusions>
</dependency>
-
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj-core.version}</version>
<scope>test</scope>
</dependency>
-
<dependency>
- <groupId>org.awaitility</groupId>
- <artifactId>awaitility</artifactId>
- <version>${awaitility.version}</version>
+ <groupId>io.github.hakky54</groupId>
+ <artifactId>logcaptor</artifactId>
+ <version>2.7.10</version>
<scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.salesforce.kafka.test</groupId>
+ <artifactId>kafka-junit5</artifactId>
+ <version>3.2.4</version>
<exclusions>
<exclusion>
- <artifactId>objenesis</artifactId>
- <groupId>org.objenesis</groupId>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-streams</artifactId>
</exclusion>
</exclusions>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.13</artifactId>
+ <version>${kafka.version}</version>
+ <scope>test</scope>
</dependency>
</dependencies>
-
</project>
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/api/IDistributionClient.java b/sdc-distribution-client/src/main/java/org/onap/sdc/api/IDistributionClient.java
index fe37dc1..ac48419 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/api/IDistributionClient.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/api/IDistributionClient.java
@@ -33,6 +33,11 @@ import org.onap.sdc.api.results.IDistributionClientResult;
import org.onap.sdc.api.notification.IArtifactInfo;
import org.onap.sdc.api.notification.IVfModuleMetadata;
+/**
+ Client for sending/receiving notifications/status related to distributions from SDC.
+ This client uses Kafka for communication with the topics.
+ For communication using DMAAP MR use latest version with major version = 1 (e.g. 1.4.5)
+ **/
public interface IDistributionClient {
/**
@@ -63,15 +68,13 @@ public interface IDistributionClient {
/**
* Stop distribution client <br>
* - stop polling notification topic <br>
- * - unregister topics (via ASDC) <br>
- * - delete keys from UEB
*
* @return IDistributionClientResult
*/
IDistributionClientResult stop();
/**
- * Downloads an artifact from ASDC Catalog <br>
+ * Downloads an artifact from SDC Catalog <br>
*
* @param artifactInfo
* @return IDistributionClientDownloadResult
@@ -80,10 +83,10 @@ public interface IDistributionClient {
/**
* Initialize the distribution client <br>
- * - fetch the UEB server list from ASDC <br>
- * - create keys in UEB <br>
- * - register for topics (via ASDC) <br>
+ * - get MessageBus server list from configuration <br>
+ * - validate artifact types against sdc server <br>
* - set the notification callback <br>
+ * - set up notification sender <br>
* <p>
* Note: all configuration fields are mandatory. <br>
* Password must be in clear text and not encrypted <br>
@@ -98,10 +101,10 @@ public interface IDistributionClient {
/**
* Initialize the distribution client <br>
- * - fetch the UEB server list from ASDC <br>
- * - create keys in UEB <br>
- * - register for topics (via ASDC) <br>
+ * - get MessageBus server list from configuration <br>
+ * - validate artifact types against sdc server <br>
* - set the notification callback <br>
+ * - set up notification sender <br>
* <p>
* Note: all configuration fields are mandatory. <br>
* Password must be in clear text and not encrypted <br>
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/api/asdc/RegistrationRequest.java b/sdc-distribution-client/src/main/java/org/onap/sdc/api/asdc/RegistrationRequest.java
deleted file mode 100644
index 765da4c..0000000
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/api/asdc/RegistrationRequest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * sdc-distribution-client
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.sdc.api.asdc;
-
-import java.util.List;
-
-public class RegistrationRequest {
-
- private String apiPublicKey;
- private String distrEnvName;
- private Boolean isConsumerToSdcDistrStatusTopic;
- private List<String> distEnvEndPoints;
-
- public RegistrationRequest(String apiPublicKey, String distrEnvName, boolean isConsumerToSdcDistrStatusTopic, List<String> distEnvEndPoints) {
- this.apiPublicKey = apiPublicKey;
- this.distrEnvName = distrEnvName;
- this.isConsumerToSdcDistrStatusTopic = isConsumerToSdcDistrStatusTopic;
- this.distEnvEndPoints = distEnvEndPoints;
- }
-
- public String getApiPublicKey() {
- return apiPublicKey;
- }
-
- public String getDistrEnvName() {
- return distrEnvName;
- }
-
- public Boolean getIsConsumerToSdcDistrStatusTopic() {
- return isConsumerToSdcDistrStatusTopic;
- }
-
- public List<String> getDistEnvEndPoints() {
- return distEnvEndPoints;
- }
-
-
-}
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/api/consumer/IConfiguration.java b/sdc-distribution-client/src/main/java/org/onap/sdc/api/consumer/IConfiguration.java
index e3013e2..84eb42b 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/api/consumer/IConfiguration.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/api/consumer/IConfiguration.java
@@ -22,6 +22,7 @@ package org.onap.sdc.api.consumer;
import java.util.List;
+import org.apache.kafka.common.KafkaException;
import org.onap.sdc.api.notification.INotificationData;
public interface IConfiguration {
@@ -30,7 +31,7 @@ public interface IConfiguration {
* without port), IP:port or FQDN (Fully Qualified Domain Name). * @return SDC
* Distribution Engine address.
*/
- String getAsdcAddress();
+ String getSdcAddress();
/**
* SDC Distribution Addresses from ONAP Component Values need to be set from
@@ -39,6 +40,32 @@ public interface IConfiguration {
List<String> getMsgBusAddress();
/**
+ * Kafka security.protocol
+ */
+ default String getKafkaSecurityProtocolConfig() {
+ return System.getenv().getOrDefault("SECURITY_PROTOCOL", "SASL_PLAINTEXT");
+ }
+
+ /**
+ * Kafka sasl.mechanism
+ */
+ default String getKafkaSaslMechanism() {
+ return System.getenv().getOrDefault("SASL_MECHANISM", "SCRAM-SHA-512");
+ }
+
+ /**
+ * Kafka sasl.jaas.config
+ */
+ default String getKafkaSaslJaasConfig() {
+ String saslJaasConfFromEnv = System.getenv("SASL_JAAS_CONFIG");
+ if(saslJaasConfFromEnv != null) {
+ return saslJaasConfFromEnv;
+ } else {
+ throw new KafkaException("sasl.jaas.config not set for Kafka Consumer");
+ }
+ }
+
+ /**
* User Name for SDC distribution consumer authentication.
*
* @return User Name.
@@ -64,7 +91,7 @@ public interface IConfiguration {
String getPassword();
/**
- * Distribution Client Polling Interval towards UEB in seconds. Can Be
+ * Distribution Client Polling Interval towards messaging bus in seconds. Can Be
* reconfigured in runtime.
*
* @return Distribution Client Polling Interval.
@@ -72,7 +99,7 @@ public interface IConfiguration {
int getPollingInterval();
/**
- * Distribution Client Timeout in seconds waiting to UEB server response in each
+ * Distribution Client Timeout in seconds waiting for messaging bus server response in each
* fetch interval. Can Be reconfigured in runtime.
*
* @return Distribution Client Timeout in seconds.
@@ -89,10 +116,10 @@ public interface IConfiguration {
List<String> getRelevantArtifactTypes();
/**
- * Returns the consumer group defined for this ECOMP component, if no consumer
+ * Returns the consumer group defined for this component, if no consumer
* group is defined return null.
*
- * @return Consumer group.
+ * @return SdcKafkaConsumer group.
*/
String getConsumerGroup();
@@ -105,7 +132,7 @@ public interface IConfiguration {
String getEnvironmentName();
/**
- * Unique ID of ECOMP component instance (e.x INSTAR name).
+ * Unique ID of component instance (e.x INSTAR name).
*
* @return
*/
@@ -113,7 +140,7 @@ public interface IConfiguration {
/**
* Return full path to Client's Key Store that contains either CA certificate or
- * the ASDC's public key (e.g /etc/keystore/asdc-client.jks) file will be
+ * the SDC's public key (e.g /etc/keystore/sdc-client.jks) file will be
* deployed with sdc-distribution jar.
*
* @return
@@ -147,15 +174,6 @@ public interface IConfiguration {
boolean isFilterInEmptyResources();
/**
- * By default, Distribution Client will use HTTPS (TLS 1.2) when connecting to
- * DMAAP. This param can be null, then default (HTTPS) behavior will be applied.
- * If set to false, distribution client will use HTTP when connecting to DMAAP.
- *
- * @return
- */
- Boolean isUseHttpsWithDmaap();
-
- /**
* By default, (false value) Distribution Client will trigger the regular
* registration towards SDC (register component as consumer to the
* SDC-DISTR-NOTIF-TOPIC-[ENV] topic and register component as producer to the
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/api/notification/INotificationData.java b/sdc-distribution-client/src/main/java/org/onap/sdc/api/notification/INotificationData.java
index 5df130a..8c24ed4 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/api/notification/INotificationData.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/api/notification/INotificationData.java
@@ -25,7 +25,7 @@ import java.util.List;
public interface INotificationData {
/**
- * Global Distribution Identifier: UUID generated by ASDC per each distribution activation.<br>
+ * Global Distribution Identifier: UUID generated by SDC per each distribution activation.<br>
* Generated UUID is compliant with RFC 4122.<br>
* It is a 128-bit value formatted into blocks of hexadecimal digits separated by a hyphen ("-").<br>
* Ex.: AA97B177-9383-4934-8543-0F91A7A02836
@@ -45,7 +45,7 @@ public interface INotificationData {
String getServiceVersion();
/**
- * Global UUID generated by ASDC per each service version. Generated UUID is compliant with RFC 4122.<br>
+ * Global UUID generated by SDC per each service version. Generated UUID is compliant with RFC 4122.<br>
* It is a 128-bit value formatted into blocks of hexadecimal digits separated by a hyphen ("-").<br>
* Ex. : AA97B177-9383-4934-8543-0F91A7A02836
*/
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/api/notification/IVfModuleMetadata.java b/sdc-distribution-client/src/main/java/org/onap/sdc/api/notification/IVfModuleMetadata.java
index 5347189..91415ce 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/api/notification/IVfModuleMetadata.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/api/notification/IVfModuleMetadata.java
@@ -45,7 +45,7 @@ public interface IVfModuleMetadata {
/**
* Global UUID of the VF Module.<br>
- * It is generated by ASDC per each new VF module version. Generated UUID is compliant with RFC 4122. It is a 128-bit value formatted into blocks of hexadecimal digits separated by a hyphen ("-").<br>
+ * It is generated by SDC per each new VF module version. Generated UUID is compliant with RFC 4122. It is a 128-bit value formatted into blocks of hexadecimal digits separated by a hyphen ("-").<br>
* Ex.: AA97B177-9383-4934-8543-0F91A7A02836
*/
String getVfModuleModelUUID();
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpClientFactory.java b/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpClientFactory.java
index 7a073a1..94e20fb 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpClientFactory.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpClientFactory.java
@@ -182,7 +182,7 @@ public class HttpClientFactory {
return HttpClientBuilder.create().setDefaultCredentialsProvider(credsProvider).setProxy(getHttpsProxyHost())
.setSSLSocketFactory(sslsf).build();
} catch (Exception e) {
- throw new HttpAsdcClientException("Failed to create https client", e);
+ throw new HttpSdcClientException("Failed to create https client", e);
}
}
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpAsdcClient.java b/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpSdcClient.java
index 14c9c7f..8b6ee0a 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpAsdcClient.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpSdcClient.java
@@ -41,9 +41,9 @@ import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
-public class HttpAsdcClient implements IHttpAsdcClient {
+public class HttpSdcClient implements IHttpSdcClient {
- private static final Logger log = LoggerFactory.getLogger(HttpAsdcClient.class.getName());
+ private static final Logger log = LoggerFactory.getLogger(HttpSdcClient.class.getName());
private static final boolean ALWAYS_CLOSE_THE_REQUEST_CONNECTION = true;
private final CloseableHttpClient httpClient;
private final String httpSchema;
@@ -56,18 +56,18 @@ public class HttpAsdcClient implements IHttpAsdcClient {
* @deprecated
* This constructor will be removed in the future.
*
- * @param configuration Asdc client configuration
+ * @param configuration Sdc client configuration
*/
@Deprecated
- public HttpAsdcClient(IConfiguration configuration) {
- this(configuration.getAsdcAddress(),
+ public HttpSdcClient(IConfiguration configuration) {
+ this(configuration.getSdcAddress(),
new HttpClientFactory(configuration),
new HttpRequestFactory(configuration.getUser(), configuration.getPassword())
);
}
- public HttpAsdcClient(String asdcAddress, HttpClientFactory httpClientFactory, HttpRequestFactory httpRequestFactory) {
- this.serverFqdn = asdcAddress;
+ public HttpSdcClient(String sdcAddress, HttpClientFactory httpClientFactory, HttpRequestFactory httpRequestFactory) {
+ this.serverFqdn = sdcAddress;
this.httpRequestFactory = httpRequestFactory;
Pair<String, CloseableHttpClient> httpClientPair = httpClientFactory.createInstance();
@@ -75,21 +75,21 @@ public class HttpAsdcClient implements IHttpAsdcClient {
this.httpClient = httpClientPair.getSecond();
}
- public HttpAsdcResponse postRequest(String requestUrl, HttpEntity entity, Map<String, String> headersMap) {
+ public HttpSdcResponse postRequest(String requestUrl, HttpEntity entity, Map<String, String> headersMap) {
return postRequest(requestUrl, entity, headersMap, ALWAYS_CLOSE_THE_REQUEST_CONNECTION).getFirst();
}
- public Pair<HttpAsdcResponse, CloseableHttpResponse> postRequest(String requestUrl, HttpEntity entity, Map<String, String> headersMap, boolean closeTheRequest) {
- Pair<HttpAsdcResponse, CloseableHttpResponse> ret;
+ public Pair<HttpSdcResponse, CloseableHttpResponse> postRequest(String requestUrl, HttpEntity entity, Map<String, String> headersMap, boolean closeTheRequest) {
+ Pair<HttpSdcResponse, CloseableHttpResponse> ret;
final String url = resolveUrl(requestUrl);
log.debug("url to send {}", url);
HttpPost httpPost = httpRequestFactory.createHttpPostRequest(url, headersMap, entity);
CloseableHttpResponse httpResponse = null;
- HttpAsdcResponse response = null;
+ HttpSdcResponse response = null;
try {
httpResponse = httpClient.execute(httpPost);
- response = new HttpAsdcResponse(httpResponse.getStatusLine().getStatusCode(), httpResponse.getEntity());
+ response = new HttpSdcResponse(httpResponse.getStatusLine().getStatusCode(), httpResponse.getEntity());
} catch (IOException e) {
log.error("failed to send request to url: {}", requestUrl);
response = createHttpResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR, "failed to send request");
@@ -100,19 +100,19 @@ public class HttpAsdcClient implements IHttpAsdcClient {
return ret;
}
- public HttpAsdcResponse getRequest(String requestUrl, Map<String, String> headersMap) {
+ public HttpSdcResponse getRequest(String requestUrl, Map<String, String> headersMap) {
return getRequest(requestUrl, headersMap, ALWAYS_CLOSE_THE_REQUEST_CONNECTION).getFirst();
}
- public Pair<HttpAsdcResponse, CloseableHttpResponse> getRequest(String requestUrl, Map<String, String> headersMap, boolean closeTheRequest) {
- Pair<HttpAsdcResponse, CloseableHttpResponse> ret;
+ public Pair<HttpSdcResponse, CloseableHttpResponse> getRequest(String requestUrl, Map<String, String> headersMap, boolean closeTheRequest) {
+ Pair<HttpSdcResponse, CloseableHttpResponse> ret;
final String url = resolveUrl(requestUrl);
log.debug("url to send {}", url);
HttpGet httpGet = httpRequestFactory.createHttpGetRequest(url, headersMap);
CloseableHttpResponse httpResponse = null;
- HttpAsdcResponse response = null;
+ HttpSdcResponse response = null;
try {
httpResponse = httpClient.execute(httpGet);
@@ -122,7 +122,7 @@ public class HttpAsdcClient implements IHttpAsdcClient {
for (Header header : headersRes) {
headersResMap.put(header.getName(), header.getValue());
}
- response = new HttpAsdcResponse(httpResponse.getStatusLine().getStatusCode(), httpResponse.getEntity(), headersResMap);
+ response = new HttpSdcResponse(httpResponse.getStatusLine().getStatusCode(), httpResponse.getEntity(), headersResMap);
} catch (UnknownHostException | ConnectException e) {
log.error("failed to connect to url: {}", requestUrl, e);
@@ -145,8 +145,8 @@ public class HttpAsdcClient implements IHttpAsdcClient {
return this.httpSchema + serverFqdn + requestUrl;
}
- private Pair<HttpAsdcResponse, CloseableHttpResponse> finalizeHttpRequest(boolean closeTheRequest, CloseableHttpResponse httpResponse, HttpAsdcResponse response) {
- Pair<HttpAsdcResponse, CloseableHttpResponse> ret;
+ private Pair<HttpSdcResponse, CloseableHttpResponse> finalizeHttpRequest(boolean closeTheRequest, CloseableHttpResponse httpResponse, HttpSdcResponse response) {
+ Pair<HttpSdcResponse, CloseableHttpResponse> ret;
if (closeTheRequest) {
if (httpResponse != null) {
try {
@@ -163,8 +163,8 @@ public class HttpAsdcClient implements IHttpAsdcClient {
return ret;
}
- static HttpAsdcResponse createHttpResponse(int httpStatusCode, String httpMessage) {
- return new HttpAsdcResponse(httpStatusCode, new StringEntity(httpMessage, StandardCharsets.UTF_8));
+ static HttpSdcResponse createHttpResponse(int httpStatusCode, String httpMessage) {
+ return new HttpSdcResponse(httpStatusCode, new StringEntity(httpMessage, StandardCharsets.UTF_8));
}
public void closeHttpClient() {
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpAsdcClientException.java b/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpSdcClientException.java
index 8d6a527..d3747f5 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpAsdcClientException.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpSdcClientException.java
@@ -19,9 +19,9 @@
*/
package org.onap.sdc.http;
-public class HttpAsdcClientException extends RuntimeException {
+public class HttpSdcClientException extends RuntimeException {
- public HttpAsdcClientException(String message, Throwable cause) {
+ public HttpSdcClientException(String message, Throwable cause) {
super(message, cause);
}
}
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpAsdcResponse.java b/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpSdcResponse.java
index 8b85684..ad64f3f 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpAsdcResponse.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpSdcResponse.java
@@ -24,19 +24,19 @@ import java.util.Map;
import org.apache.http.HttpEntity;
-public class HttpAsdcResponse {
+public class HttpSdcResponse {
private int status;
private HttpEntity message;
private Map<String, String> headersMap;
- public HttpAsdcResponse(int status, HttpEntity message) {
+ public HttpSdcResponse(int status, HttpEntity message) {
super();
this.status = status;
this.message = message;
}
- public HttpAsdcResponse(int status, HttpEntity message, Map<String, String> headersMap) {
+ public HttpSdcResponse(int status, HttpEntity message, Map<String, String> headersMap) {
super();
this.status = status;
this.message = message;
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/http/IHttpAsdcClient.java b/sdc-distribution-client/src/main/java/org/onap/sdc/http/IHttpSdcClient.java
index 2a0d9a0..9adce23 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/http/IHttpAsdcClient.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/http/IHttpSdcClient.java
@@ -24,11 +24,11 @@ import java.util.Map;
import org.apache.http.HttpEntity;
-public interface IHttpAsdcClient {
+public interface IHttpSdcClient {
- HttpAsdcResponse postRequest(String requestUrl, HttpEntity entity, Map<String, String> headersMap);
+ HttpSdcResponse postRequest(String requestUrl, HttpEntity entity, Map<String, String> headersMap);
- HttpAsdcResponse getRequest(String requestUrl, Map<String, String> headersMap);
+ HttpSdcResponse getRequest(String requestUrl, Map<String, String> headersMap);
void closeHttpClient();
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/http/SdcConnectorClient.java b/sdc-distribution-client/src/main/java/org/onap/sdc/http/SdcConnectorClient.java
index 7aac780..18aba95 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/http/SdcConnectorClient.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/http/SdcConnectorClient.java
@@ -21,6 +21,10 @@
package org.onap.sdc.http;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
+import fj.data.Either;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Type;
@@ -30,7 +34,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
-
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
@@ -38,33 +41,26 @@ import org.apache.http.HttpStatus;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
+import org.onap.sdc.api.consumer.IConfiguration;
import org.onap.sdc.api.notification.IArtifactInfo;
import org.onap.sdc.api.results.IDistributionClientResult;
+import org.onap.sdc.impl.DistributionClientDownloadResultImpl;
import org.onap.sdc.impl.DistributionClientResultImpl;
import org.onap.sdc.utils.DistributionActionResultEnum;
-import org.onap.sdc.api.asdc.RegistrationRequest;
-import org.onap.sdc.api.consumer.IConfiguration;
-import org.onap.sdc.impl.DistributionClientDownloadResultImpl;
import org.onap.sdc.utils.DistributionClientConstants;
import org.onap.sdc.utils.Pair;
+import org.onap.sdc.utils.kafka.KafkaDataResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.att.nsa.apiClient.credentials.ApiCredential;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.reflect.TypeToken;
-
-import fj.data.Either;
-
public class SdcConnectorClient {
private static final Logger log = LoggerFactory.getLogger(SdcConnectorClient.class.getName());
static final String CONTENT_DISPOSITION_HEADER = "Content-Disposition";
private final IConfiguration configuration;
- private final HttpAsdcClient httpClient;
+ private final HttpSdcClient httpClient;
- public SdcConnectorClient(IConfiguration configuration, HttpAsdcClient httpClient) {
+ public SdcConnectorClient(IConfiguration configuration, HttpSdcClient httpClient) {
Objects.requireNonNull(configuration);
Objects.requireNonNull(httpClient);
this.configuration = configuration;
@@ -76,23 +72,37 @@ public class SdcConnectorClient {
}
public Either<List<String>, IDistributionClientResult> getValidArtifactTypesList() {
- Pair<HttpAsdcResponse, CloseableHttpResponse> getServersResponsePair = performAsdcServerRequest();
- HttpAsdcResponse getArtifactTypeResponse = getServersResponsePair.getFirst();
+ Pair<HttpSdcResponse, CloseableHttpResponse> getServersResponsePair = performSdcServerRequest(SdcUrls.GET_VALID_ARTIFACT_TYPES);
+ HttpSdcResponse getArtifactTypeResponse = getServersResponsePair.getFirst();
Either<List<String>, IDistributionClientResult> response;
if (getArtifactTypeResponse.getStatus() == HttpStatus.SC_OK) {
response = parseGetValidArtifactTypesResponse(getArtifactTypeResponse);
} else {
- IDistributionClientResult asdcError = handleAsdcError(getArtifactTypeResponse);
- response = Either.right(asdcError);
+ IDistributionClientResult sdcError = handleSdcError(getArtifactTypeResponse);
+ response = Either.right(sdcError);
}
- handeAsdcConnectionClose(getServersResponsePair);
+ handeSdcConnectionClose(getServersResponsePair);
return response;
}
- private void handeAsdcConnectionClose(Pair<HttpAsdcResponse, CloseableHttpResponse> getServersResponsePair) {
+ public Either<KafkaDataResponse, IDistributionClientResult> getKafkaDistData() {
+ Pair<HttpSdcResponse, CloseableHttpResponse> getServersResponsePair = performSdcServerRequest(SdcUrls.GET_KAFKA_DIST_DATA);
+ HttpSdcResponse getKafkaDistDataResponse = getServersResponsePair.getFirst();
+ Either<KafkaDataResponse, IDistributionClientResult> response;
+ if (getKafkaDistDataResponse.getStatus() == HttpStatus.SC_OK) {
+ response = parseGetKafkaDistDataResponse(getKafkaDistDataResponse);
+ } else {
+ IDistributionClientResult sdcError = handleSdcError(getKafkaDistDataResponse);
+ response = Either.right(sdcError);
+ }
+ handeSdcConnectionClose(getServersResponsePair);
+ return response;
+ }
+
+ private void handeSdcConnectionClose(Pair<HttpSdcResponse, CloseableHttpResponse> getServersResponsePair) {
if (getServersResponsePair.getSecond() != null) {
try {
getServersResponsePair.getSecond().close();
@@ -104,79 +114,17 @@ public class SdcConnectorClient {
}
}
- private Pair<HttpAsdcResponse, CloseableHttpResponse> performAsdcServerRequest() {
+ private Pair<HttpSdcResponse, CloseableHttpResponse> performSdcServerRequest(String sdcUrl) {
String requestId = generateRequestId();
Map<String, String> requestHeaders = addHeadersToHttpRequest(requestId);
- log.debug("about to perform getServerList. requestId= {} url= {}", requestId, AsdcUrls.GET_VALID_ARTIFACT_TYPES);
- return httpClient.getRequest(AsdcUrls.GET_VALID_ARTIFACT_TYPES, requestHeaders, false);
- }
-
- public Either<TopicRegistrationResponse, DistributionClientResultImpl> registerAsdcTopics(ApiCredential credential) {
-
- Either<TopicRegistrationResponse, DistributionClientResultImpl> response;
-
- String requestId = generateRequestId();
- Map<String, String> requestHeaders = addHeadersToHttpRequest(requestId);
-
- RegistrationRequest registrationRequest = new RegistrationRequest(credential.getApiKey(), configuration.getEnvironmentName(), configuration.isConsumeProduceStatusTopic(), configuration.getMsgBusAddress());
- Gson gson = new GsonBuilder().setPrettyPrinting().create();
- String jsonRequest = gson.toJson(registrationRequest);
- StringEntity body = new StringEntity(jsonRequest, ContentType.APPLICATION_JSON);
-
- log.debug("about to perform registerAsdcTopics. requestId= {} url= {}", requestId, AsdcUrls.POST_FOR_TOPIC_REGISTRATION);
- Pair<HttpAsdcResponse, CloseableHttpResponse> registerResponsePair = httpClient.postRequest(AsdcUrls.POST_FOR_TOPIC_REGISTRATION, body, requestHeaders, false);
- HttpAsdcResponse registerResponse = registerResponsePair.getFirst();
- int status = registerResponse.getStatus();
-
- if (status == HttpStatus.SC_OK) {
- response = parseRegistrationResponse(registerResponse);
-
- } else {
- DistributionClientResultImpl asdcError = handleAsdcError(registerResponse);
- return Either.right(asdcError);
- }
- handeAsdcConnectionClose(registerResponsePair);
-
- log.debug("registerAsdcTopics response= {}. requestId= {} url= {}", status, requestId, AsdcUrls.POST_FOR_TOPIC_REGISTRATION);
- return response;
-
+ log.debug("about to perform get on SDC. requestId= {} url= {}", requestId, sdcUrl);
+ return httpClient.getRequest(sdcUrl, requestHeaders, false);
}
private String generateRequestId() {
return UUID.randomUUID().toString();
}
- public IDistributionClientResult unregisterTopics(ApiCredential credential) {
-
- DistributionClientResultImpl response;
-
- String requestId = generateRequestId();
- Map<String, String> requestHeaders = addHeadersToHttpRequest(requestId);
-
- RegistrationRequest registrationRequest = new RegistrationRequest(credential.getApiKey(), configuration.getEnvironmentName(), configuration.isConsumeProduceStatusTopic(), configuration.getMsgBusAddress());
- Gson gson = new GsonBuilder().setPrettyPrinting().create();
- String jsonRequest = gson.toJson(registrationRequest);
- StringEntity body = new StringEntity(jsonRequest, ContentType.APPLICATION_JSON);
-
- log.debug("about to perform unregisterTopics. requestId= {} url= {}", requestId, AsdcUrls.POST_FOR_UNREGISTER);
- Pair<HttpAsdcResponse, CloseableHttpResponse> unRegisterResponsePair = httpClient.postRequest(AsdcUrls.POST_FOR_UNREGISTER, body, requestHeaders, false);
- HttpAsdcResponse unRegisterResponse = unRegisterResponsePair.getFirst();
- int status = unRegisterResponse.getStatus();
- if (status == HttpStatus.SC_NO_CONTENT || status == HttpStatus.SC_OK) {
- response = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "unregistration successful");
-
- } else {
- response = handleAsdcError(unRegisterResponse);
- }
-
- handeAsdcConnectionClose(unRegisterResponsePair);
-
- log.debug("unregisterTopics response = {}. requestId= {} url= {}", status, requestId, AsdcUrls.POST_FOR_UNREGISTER);
-
- return response;
-
- }
-
public DistributionClientDownloadResultImpl downloadArtifact(IArtifactInfo artifactInfo) {
DistributionClientDownloadResultImpl response;
@@ -186,23 +134,23 @@ public class SdcConnectorClient {
requestHeaders.put(DistributionClientConstants.HEADER_INSTANCE_ID, configuration.getConsumerID());
requestHeaders.put(HttpHeaders.ACCEPT, ContentType.APPLICATION_OCTET_STREAM.toString());
String requestUrl = artifactInfo.getArtifactURL();
- Pair<HttpAsdcResponse, CloseableHttpResponse> downloadPair = httpClient.getRequest(requestUrl, requestHeaders, false);
- HttpAsdcResponse downloadResponse = downloadPair.getFirst();
+ Pair<HttpSdcResponse, CloseableHttpResponse> downloadPair = httpClient.getRequest(requestUrl, requestHeaders, false);
+ HttpSdcResponse downloadResponse = downloadPair.getFirst();
int status = downloadResponse.getStatus();
if (status == HttpStatus.SC_OK) {
response = parseDownloadArtifactResponse(artifactInfo, downloadResponse);
} else {
- response = handleAsdcDownloadArtifactError(downloadResponse);
+ response = handleSdcDownloadArtifactError(downloadResponse);
}
- handeAsdcConnectionClose(downloadPair);
+ handeSdcConnectionClose(downloadPair);
return response;
}
/* **************************** private methods ********************************************/
- private Either<List<String>, IDistributionClientResult> parseGetValidArtifactTypesResponse(HttpAsdcResponse getArtifactTypesResponse) {
+ private Either<List<String>, IDistributionClientResult> parseGetValidArtifactTypesResponse(HttpSdcResponse getArtifactTypesResponse) {
Either<List<String>, IDistributionClientResult> result;
try {
String jsonMessage = IOUtils.toString(getArtifactTypesResponse.getMessage().getContent());
@@ -219,39 +167,33 @@ public class SdcConnectorClient {
return result;
}
- private Either<List<String>, IDistributionClientResult> handleParsingError(Exception e) {
- Either<List<String>, IDistributionClientResult> result;
- log.error("failed to parse response from ASDC. error: ", e);
- IDistributionClientResult response = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "failed to parse response from ASDC");
- result = Either.right(response);
- return result;
- }
-
- Either<TopicRegistrationResponse, DistributionClientResultImpl> parseRegistrationResponse(HttpAsdcResponse registerResponse) {
-
- String jsonMessage;
+ private Either<KafkaDataResponse, IDistributionClientResult> parseGetKafkaDistDataResponse(HttpSdcResponse getKafkaDistDataResponse) {
+ Either<KafkaDataResponse, IDistributionClientResult> result;
try {
- jsonMessage = IOUtils.toString(registerResponse.getMessage().getContent());
-
+ String jsonMessage = IOUtils.toString(getKafkaDistDataResponse.getMessage().getContent());
Gson gson = new GsonBuilder().create();
- TopicRegistrationResponse registrationResponse = gson.fromJson(jsonMessage, TopicRegistrationResponse.class);
-
- if (registrationResponse.getDistrNotificationTopicName() == null) {
- DistributionClientResultImpl response = new DistributionClientResultImpl(DistributionActionResultEnum.FAIL, "failed to receive notification topic from ASDC");
- return Either.right(response);
- }
-
- if (registrationResponse.getDistrStatusTopicName() == null) {
- DistributionClientResultImpl response = new DistributionClientResultImpl(DistributionActionResultEnum.FAIL, "failed to receive status topic from ASDC");
- return Either.right(response);
- }
- return Either.left(registrationResponse);
-
+ KafkaDataResponse kafkaData = gson.fromJson(jsonMessage, KafkaDataResponse.class);
+ result = Either.left(kafkaData);
} catch (UnsupportedOperationException | IOException e) {
- log.error("failed to parse response from ASDC. error: ", e);
- DistributionClientResultImpl response = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "failed to parse response from ASDC");
- return Either.right(response);
+ result = handleKafkaParsingError(e);
}
+ return result;
+ }
+
+ private Either<KafkaDataResponse, IDistributionClientResult> handleKafkaParsingError(Exception e) {
+ Either<KafkaDataResponse, IDistributionClientResult> result;
+ log.error("failed to parse kafka data response from SDC. error: ", e);
+ IDistributionClientResult response = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "failed to parse kafka data response from SDC");
+ result = Either.right(response);
+ return result;
+ }
+
+ private Either<List<String>, IDistributionClientResult> handleParsingError(Exception e) {
+ Either<List<String>, IDistributionClientResult> result;
+ log.error("failed to parse response from SDC. error: ", e);
+ IDistributionClientResult response = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "failed to parse response from SDC");
+ result = Either.right(response);
+ return result;
}
protected Map<String, String> addHeadersToHttpRequest(String requestId) {
@@ -263,29 +205,29 @@ public class SdcConnectorClient {
return requestHeaders;
}
- private DistributionClientResultImpl handleAsdcError(HttpAsdcResponse registerResponse) {
+ private DistributionClientResultImpl handleSdcError(HttpSdcResponse registerResponse) {
int status = registerResponse.getStatus();
- DistributionClientResultImpl errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "failed to send request to ASDC");
+ DistributionClientResultImpl errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "failed to send request to SDC");
if (status == HttpStatus.SC_UNAUTHORIZED) {
- errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.ASDC_AUTHENTICATION_FAILED, "authentication to ASDC failed for user " + configuration.getUser());
+ errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.SDC_AUTHENTICATION_FAILED, "authentication to SDC failed for user " + configuration.getUser());
} else if (status == HttpStatus.SC_FORBIDDEN) {
- errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.ASDC_AUTHORIZATION_FAILED, "authorization failure for user " + configuration.getUser());
+ errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.SDC_AUTHORIZATION_FAILED, "authorization failure for user " + configuration.getUser());
} else if (status == HttpStatus.SC_BAD_REQUEST) {
- errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.BAD_REQUEST, "ASDC call failed due to missing information");
+ errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.BAD_REQUEST, "SDC call failed due to missing information");
} else if (status == HttpStatus.SC_NOT_FOUND) {
- errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.ASDC_NOT_FOUND, "ASDC not found");
+ errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.SDC_NOT_FOUND, "SDC not found");
} else if (status == HttpStatus.SC_INTERNAL_SERVER_ERROR) {
- errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.ASDC_SERVER_PROBLEM, "ASDC server problem");
+ errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.SDC_SERVER_PROBLEM, "SDC server problem");
} else if (status == HttpStatus.SC_BAD_GATEWAY) {
- errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.ASDC_CONNECTION_FAILED, "ASDC server problem");
+ errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.SDC_CONNECTION_FAILED, "SDC server problem");
} else if (status == HttpStatus.SC_GATEWAY_TIMEOUT) {
- errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.ASDC_SERVER_TIMEOUT, "ASDC server problem");
+ errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.SDC_SERVER_TIMEOUT, "SDC server problem");
}
- log.error("status from ASDC is {}", registerResponse);
+ log.error("status from SDC is {}", registerResponse);
log.error(errorResponse.toString());
try {
String errorString = IOUtils.toString(registerResponse.getMessage().getContent());
- log.debug("error from ASDC is: {}", errorString);
+ log.debug("error from SDC is: {}", errorString);
} catch (UnsupportedOperationException | IOException e) {
log.error("During error handling another exception occurred: ", e);
}
@@ -293,27 +235,27 @@ public class SdcConnectorClient {
}
- private DistributionClientDownloadResultImpl handleAsdcDownloadArtifactError(HttpAsdcResponse registerResponse) {
+ private DistributionClientDownloadResultImpl handleSdcDownloadArtifactError(HttpSdcResponse registerResponse) {
int status = registerResponse.getStatus();
- DistributionClientDownloadResultImpl errorResponse = new DistributionClientDownloadResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "failed to send request to ASDC");
+ DistributionClientDownloadResultImpl errorResponse = new DistributionClientDownloadResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "failed to send request to SDC");
if (status == HttpStatus.SC_UNAUTHORIZED) {
- errorResponse = new DistributionClientDownloadResultImpl(DistributionActionResultEnum.ASDC_AUTHENTICATION_FAILED, "authentication to ASDC failed for user " + configuration.getUser());
+ errorResponse = new DistributionClientDownloadResultImpl(DistributionActionResultEnum.SDC_AUTHENTICATION_FAILED, "authentication to SDC failed for user " + configuration.getUser());
} else if (status == HttpStatus.SC_FORBIDDEN) {
- errorResponse = new DistributionClientDownloadResultImpl(DistributionActionResultEnum.ASDC_AUTHORIZATION_FAILED, "authorization failure for user " + configuration.getUser());
+ errorResponse = new DistributionClientDownloadResultImpl(DistributionActionResultEnum.SDC_AUTHORIZATION_FAILED, "authorization failure for user " + configuration.getUser());
} else if (status == HttpStatus.SC_BAD_REQUEST || status == HttpStatus.SC_NOT_FOUND) {
errorResponse = new DistributionClientDownloadResultImpl(DistributionActionResultEnum.ARTIFACT_NOT_FOUND, "Specified artifact is not found");
// } else if (status == 404){
// errorResponse = new DistributionClientDownloadResultImpl(
- // DistributionActionResultEnum.ASDC_NOT_FOUND,
- // "ASDC not found");
+ // DistributionActionResultEnum.SDC_NOT_FOUND,
+ // "SDC not found");
} else if (status == HttpStatus.SC_INTERNAL_SERVER_ERROR) {
- errorResponse = new DistributionClientDownloadResultImpl(DistributionActionResultEnum.ASDC_SERVER_PROBLEM, "ASDC server problem");
+ errorResponse = new DistributionClientDownloadResultImpl(DistributionActionResultEnum.SDC_SERVER_PROBLEM, "SDC server problem");
}
- log.error("status from ASDC is {}", registerResponse);
+ log.error("status from SDC is {}", registerResponse);
log.error(errorResponse.toString());
try {
String errorString = IOUtils.toString(registerResponse.getMessage().getContent());
- log.debug("error from ASDC is: {}", errorString);
+ log.debug("error from SDC is: {}", errorString);
} catch (UnsupportedOperationException | IOException e) {
log.error("During error handling another exception occurred: ", e);
}
@@ -321,7 +263,7 @@ public class SdcConnectorClient {
}
- private DistributionClientDownloadResultImpl parseDownloadArtifactResponse(IArtifactInfo artifactInfo, HttpAsdcResponse getServersResponse) {
+ private DistributionClientDownloadResultImpl parseDownloadArtifactResponse(IArtifactInfo artifactInfo, HttpSdcResponse getServersResponse) {
HttpEntity entity = getServersResponse.getMessage();
InputStream is;
try {
@@ -333,7 +275,7 @@ public class SdcConnectorClient {
byte[] payload = IOUtils.toByteArray(is);
if (artifactInfo.getArtifactChecksum() == null || artifactInfo.getArtifactChecksum().isEmpty()) {
- return new DistributionClientDownloadResultImpl(DistributionActionResultEnum.DATA_INTEGRITY_PROBLEM, "failed to get artifact from ASDC. Empty checksum");
+ return new DistributionClientDownloadResultImpl(DistributionActionResultEnum.DATA_INTEGRITY_PROBLEM, "failed to get artifact from SDC. Empty checksum");
}
return new DistributionClientDownloadResultImpl(DistributionActionResultEnum.SUCCESS, "success", artifactName, payload);
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/api/asdc/ServerListResponse.java b/sdc-distribution-client/src/main/java/org/onap/sdc/http/SdcUrls.java
index b0cfb1e..4b9a648 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/api/asdc/ServerListResponse.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/http/SdcUrls.java
@@ -18,18 +18,13 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.sdc.api.asdc;
+package org.onap.sdc.http;
-import java.util.List;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
+public class SdcUrls {
-@Getter
-@Setter
-@NoArgsConstructor
-public class ServerListResponse {
-
- private List<String> uebServerList;
+ private SdcUrls() {
+ }
+ public static final String GET_VALID_ARTIFACT_TYPES = "/sdc/v1/artifactTypes";
+ public static final String GET_KAFKA_DIST_DATA = "/sdc/v1/distributionKafkaData";
}
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/http/TopicRegistrationResponse.java b/sdc-distribution-client/src/main/java/org/onap/sdc/http/TopicRegistrationResponse.java
deleted file mode 100644
index 448005c..0000000
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/http/TopicRegistrationResponse.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * sdc-distribution-client
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.sdc.http;
-
-public class TopicRegistrationResponse {
- private String distrNotificationTopicName;
- private String distrStatusTopicName;
-
-
- public void setDistrNotificationTopicName(String distrNotificationTopicName) {
- this.distrNotificationTopicName = distrNotificationTopicName;
- }
-
- public void setDistrStatusTopicName(String distrStatusTopicName) {
- this.distrStatusTopicName = distrStatusTopicName;
- }
-
- public String getDistrNotificationTopicName() {
- return distrNotificationTopicName;
- }
-
- public String getDistrStatusTopicName() {
- return distrStatusTopicName;
- }
-}
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java
index 2da9c4e..db4433b 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java
@@ -28,7 +28,12 @@ import org.onap.sdc.utils.DistributionClientConstants;
public class Configuration implements IConfiguration {
private List<String> msgBusAddressList;
- private String asdcAddress;
+ private final String kafkaSecurityProtocolConfig;
+ private final String kafkaSaslMechanism;
+ private final String kafkaSaslJaasConfig;
+ private String sdcStatusTopicName;
+ private String sdcNotificationTopicName;
+ private String sdcAddress;
private String user;
private String password;
private int pollingInterval = DistributionClientConstants.MIN_POLLING_INTERVAL_SEC;
@@ -40,10 +45,9 @@ public class Configuration implements IConfiguration {
private String keyStorePath;
private String keyStorePassword;
private boolean activateServerTLSAuth;
- private boolean filterInEmptyResources;
- private Boolean useHttpsWithDmaap;
+ private final boolean filterInEmptyResources;
private Boolean useHttpsWithSDC;
- private boolean consumeProduceStatusTopic;
+ private final boolean consumeProduceStatusTopic;
private String httpProxyHost;
private int httpProxyPort;
private String httpsProxyHost;
@@ -51,23 +55,24 @@ public class Configuration implements IConfiguration {
private boolean useSystemProxy;
public Configuration(IConfiguration other) {
- this.asdcAddress = other.getAsdcAddress();
- this.msgBusAddressList = other.getMsgBusAddress();
+ this.kafkaSecurityProtocolConfig = other.getKafkaSecurityProtocolConfig();
+ this.kafkaSaslMechanism = other.getKafkaSaslMechanism();
+ this.kafkaSaslJaasConfig = other.getKafkaSaslJaasConfig();
this.comsumerID = other.getConsumerID();
this.consumerGroup = other.getConsumerGroup();
- this.environmentName = other.getEnvironmentName();
- this.password = other.getPassword();
this.pollingInterval = other.getPollingInterval();
this.pollingTimeout = other.getPollingTimeout();
- this.relevantArtifactTypes = other.getRelevantArtifactTypes();
+ this.environmentName = other.getEnvironmentName();
+ this.consumeProduceStatusTopic = other.isConsumeProduceStatusTopic();
+ this.sdcAddress = other.getSdcAddress();
this.user = other.getUser();
+ this.password = other.getPassword();
+ this.relevantArtifactTypes = other.getRelevantArtifactTypes();
this.useHttpsWithSDC = other.isUseHttpsWithSDC();
this.keyStorePath = other.getKeyStorePath();
this.keyStorePassword = other.getKeyStorePassword();
this.activateServerTLSAuth = other.activateServerTLSAuth();
this.filterInEmptyResources = other.isFilterInEmptyResources();
- this.useHttpsWithDmaap = other.isUseHttpsWithDmaap();
- this.consumeProduceStatusTopic = other.isConsumeProduceStatusTopic();
this.httpProxyHost = other.getHttpProxyHost();
this.httpProxyPort = other.getHttpProxyPort();
this.httpsProxyHost = other.getHttpsProxyHost();
@@ -76,8 +81,24 @@ public class Configuration implements IConfiguration {
}
@Override
- public String getAsdcAddress() {
- return asdcAddress;
+ public String getSdcAddress() {
+ return sdcAddress;
+ }
+
+ public String getStatusTopicName() {
+ return sdcStatusTopicName;
+ }
+
+ public void setStatusTopicName(String sdcStatusTopicName) {
+ this.sdcStatusTopicName = sdcStatusTopicName;
+ }
+
+ public String getNotificationTopicName() {
+ return sdcNotificationTopicName;
+ }
+
+ public void setNotificationTopicName(String sdcNotificationTopicName) {
+ this.sdcNotificationTopicName = sdcNotificationTopicName;
}
@Override
@@ -85,6 +106,25 @@ public class Configuration implements IConfiguration {
return msgBusAddressList;
}
+ public void setMsgBusAddress(List<String> newMsgBusAddress) {
+ msgBusAddressList = newMsgBusAddress;
+ }
+
+ @Override
+ public String getKafkaSecurityProtocolConfig() {
+ return kafkaSecurityProtocolConfig;
+ }
+
+ @Override
+ public String getKafkaSaslMechanism() {
+ return kafkaSaslMechanism;
+ }
+
+ @Override
+ public String getKafkaSaslJaasConfig() {
+ return kafkaSaslJaasConfig;
+ }
+
@Override
public Boolean isUseHttpsWithSDC() {
return useHttpsWithSDC;
@@ -169,8 +209,8 @@ public class Configuration implements IConfiguration {
this.comsumerID = comsumerID;
}
- public void setAsdcAddress(String asdcAddress) {
- this.asdcAddress = asdcAddress;
+ public void setSdcAddress(String sdcAddress) {
+ this.sdcAddress = sdcAddress;
}
public void setUser(String user) {
@@ -243,19 +283,10 @@ public class Configuration implements IConfiguration {
return this.filterInEmptyResources;
}
- @Override
- public Boolean isUseHttpsWithDmaap() {
- return this.useHttpsWithDmaap;
- }
-
public void setUseHttpsWithSDC(boolean useHttpsWithSDC) {
this.useHttpsWithSDC = useHttpsWithSDC;
}
- public void setUseHttpsWithDmaap(boolean useHttpsWithDmaap) {
- this.useHttpsWithDmaap = useHttpsWithDmaap;
- }
-
@Override
public boolean isConsumeProduceStatusTopic() {
return this.consumeProduceStatusTopic;
@@ -265,11 +296,13 @@ public class Configuration implements IConfiguration {
public String toString() {
//@formatter:off
return "Configuration ["
- + "asdcAddress=" + asdcAddress
+ + "sdcAddress=" + sdcAddress
+ ", user=" + user
+ ", password=" + password
+ ", useHttpsWithSDC=" + useHttpsWithSDC
+ ", pollingInterval=" + pollingInterval
+ + ", sdcStatusTopicName=" + sdcStatusTopicName
+ + ", sdcNotificationTopicName=" + sdcNotificationTopicName
+ ", pollingTimeout=" + pollingTimeout
+ ", relevantArtifactTypes=" + relevantArtifactTypes
+ ", consumerGroup=" + consumerGroup
@@ -279,7 +312,6 @@ public class Configuration implements IConfiguration {
+ ", keyStorePassword=" + keyStorePassword
+ ", activateServerTLSAuth=" + activateServerTLSAuth
+ ", filterInEmptyResources=" + filterInEmptyResources
- + ", useHttpsWithDmaap=" + useHttpsWithDmaap
+ ", consumeProduceStatusTopic=" + consumeProduceStatusTopic
+ ", useSystemProxy=" + useSystemProxy
+ ", httpProxyHost=" + httpProxyHost
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/ConfigurationValidator.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/ConfigurationValidator.java
index b645ed1..829c6ce 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/ConfigurationValidator.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/ConfigurationValidator.java
@@ -19,23 +19,22 @@
*/
package org.onap.sdc.impl;
-import org.onap.sdc.api.consumer.IConfiguration;
-import org.onap.sdc.api.consumer.IStatusCallback;
-import org.onap.sdc.utils.DistributionActionResultEnum;
-import org.onap.sdc.utils.DistributionClientConstants;
-
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.regex.Matcher;
+import org.onap.sdc.api.consumer.IConfiguration;
+import org.onap.sdc.api.consumer.IStatusCallback;
+import org.onap.sdc.utils.DistributionActionResultEnum;
+import org.onap.sdc.utils.DistributionClientConstants;
public class ConfigurationValidator {
private Map<Function<IConfiguration, Boolean>, DistributionActionResultEnum> cachedValidators;
- DistributionActionResultEnum validateConfiguration(IConfiguration conf, IStatusCallback statusCallback) {
+ public DistributionActionResultEnum validateConfiguration(IConfiguration conf, IStatusCallback statusCallback) {
final Map<Function<IConfiguration, Boolean>, DistributionActionResultEnum> validators = getValidators(statusCallback);
for (Map.Entry<Function<IConfiguration, Boolean>, DistributionActionResultEnum> validation : validators.entrySet()) {
@@ -53,10 +52,8 @@ public class ConfigurationValidator {
validators.put(isCustomerIdNotSet(), DistributionActionResultEnum.CONF_MISSING_CONSUMER_ID);
validators.put(isUserNotSet(), DistributionActionResultEnum.CONF_MISSING_USERNAME);
validators.put(isPasswordNotSet(), DistributionActionResultEnum.CONF_MISSING_PASSWORD);
- validators.put(isMsgBusAddressNotSet(), DistributionActionResultEnum.CONF_MISSING_MSG_BUS_ADDRESS);
- validators.put(isAsdcAddressNotSet(), DistributionActionResultEnum.CONF_MISSING_ASDC_FQDN);
- validators.put(isFqdnValid(), DistributionActionResultEnum.CONF_INVALID_ASDC_FQDN);
- validators.put(areFqdnsValid(), DistributionActionResultEnum.CONF_INVALID_MSG_BUS_ADDRESS);
+ validators.put(isSdcAddressNotSet(), DistributionActionResultEnum.CONF_MISSING_SDC_FQDN);
+ validators.put(isFqdnValid(), DistributionActionResultEnum.CONF_INVALID_SDC_FQDN);
validators.put(isEnvNameNotSet(), DistributionActionResultEnum.CONF_MISSING_ENVIRONMENT_NAME);
validators.put(isRelevantArtifactTypesNotSet(), DistributionActionResultEnum.CONF_MISSING_ARTIFACT_TYPES);
validators.put(isConsumeStatusTopicWithCallbackNotSet(statusCallback), DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG);
@@ -85,17 +82,10 @@ public class ConfigurationValidator {
return it -> it.getEnvironmentName() == null || it.getEnvironmentName().isEmpty();
}
- private Function<IConfiguration, Boolean> areFqdnsValid() {
- return it -> !isValidFqdns(it.getMsgBusAddress());
- }
-
private Function<IConfiguration, Boolean> isFqdnValid() {
- return it -> !isValidFqdn(it.getAsdcAddress());
+ return it -> !isValidFqdn(it.getSdcAddress());
}
- private Function<IConfiguration, Boolean> isMsgBusAddressNotSet() {
- return it -> it.getMsgBusAddress() == null || it.getMsgBusAddress().isEmpty();
- }
private Function<IConfiguration, Boolean> isPasswordNotSet() {
return it -> it.getPassword() == null || it.getPassword().isEmpty();
@@ -113,8 +103,8 @@ public class ConfigurationValidator {
return it -> it.getConsumerID() == null || it.getConsumerID().isEmpty();
}
- private Function<IConfiguration, Boolean> isAsdcAddressNotSet() {
- return it -> it.getAsdcAddress() == null || it.getAsdcAddress().isEmpty();
+ private Function<IConfiguration, Boolean> isSdcAddressNotSet() {
+ return it -> it.getSdcAddress() == null || it.getSdcAddress().isEmpty();
}
static boolean isValidFqdn(String fqdn) {
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java
index 3a25abb..a34ba1e 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java
@@ -23,11 +23,12 @@ package org.onap.sdc.impl;
import static java.util.Objects.isNull;
-import java.io.IOException;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
+import fj.data.Either;
import java.lang.reflect.Type;
-import java.net.MalformedURLException;
import java.nio.charset.StandardCharsets;
-import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -35,8 +36,8 @@ import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-
import org.apache.http.HttpHost;
+import org.apache.kafka.common.KafkaException;
import org.onap.sdc.api.IDistributionClient;
import org.onap.sdc.api.IDistributionStatusMessageJsonBuilder;
import org.onap.sdc.api.consumer.IComponentDoneStatusMessage;
@@ -46,53 +47,35 @@ import org.onap.sdc.api.consumer.IFinalDistrStatusMessage;
import org.onap.sdc.api.consumer.INotificationCallback;
import org.onap.sdc.api.consumer.IStatusCallback;
import org.onap.sdc.api.notification.IArtifactInfo;
+import org.onap.sdc.api.notification.IVfModuleMetadata;
import org.onap.sdc.api.results.IDistributionClientDownloadResult;
import org.onap.sdc.api.results.IDistributionClientResult;
-import org.onap.sdc.http.HttpAsdcClient;
+import org.onap.sdc.http.HttpClientFactory;
+import org.onap.sdc.http.HttpRequestFactory;
+import org.onap.sdc.http.HttpSdcClient;
import org.onap.sdc.http.SdcConnectorClient;
-import org.onap.sdc.http.TopicRegistrationResponse;
import org.onap.sdc.utils.DistributionActionResultEnum;
import org.onap.sdc.utils.DistributionClientConstants;
-import org.onap.sdc.utils.GeneralUtils;
import org.onap.sdc.utils.NotificationSender;
import org.onap.sdc.utils.Pair;
import org.onap.sdc.utils.Wrapper;
-import org.onap.sdc.api.notification.IVfModuleMetadata;
+import org.onap.sdc.utils.kafka.KafkaDataResponse;
+import org.onap.sdc.utils.kafka.SdcKafkaConsumer;
+import org.onap.sdc.utils.kafka.SdcKafkaProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.att.nsa.apiClient.credentials.ApiCredential;
-import com.att.nsa.apiClient.http.HttpException;
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.cambria.client.CambriaClient.CambriaApiException;
-import com.att.nsa.cambria.client.CambriaClientBuilders.AbstractAuthenticatedManagerBuilder;
-import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
-import com.att.nsa.cambria.client.CambriaClientBuilders.IdentityManagerBuilder;
-import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
-import com.att.nsa.cambria.client.CambriaConsumer;
-import com.att.nsa.cambria.client.CambriaIdentityManager;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.reflect.TypeToken;
-
-import fj.data.Either;
-
public class DistributionClientImpl implements IDistributionClient {
- private static final int POLLING_TIMEOUT_MULTIPLIER = 1000;
private static final int TERMINATION_TIMEOUT = 60;
private final Logger log;
- private SdcConnectorClient asdcConnector;
+ private SdcConnectorClient sdcConnector;
private ScheduledExecutorService executorPool = null;
- protected CambriaIdentityManager cambriaIdentityManager = null;
- private List<String> brokerServers;
- private ApiCredential credential;
+ private SdcKafkaProducer producer;
protected Configuration configuration;
private INotificationCallback callback;
private IStatusCallback statusCallback;
- private String notificationTopic;
- private String statusTopic;
private boolean isConsumerGroupGenerated = false;
private NotificationSender notificationSender;
private final ConfigurationValidator configurationValidator = new ConfigurationValidator();
@@ -110,12 +93,69 @@ public class DistributionClientImpl implements IDistributionClient {
}
@Override
+ public synchronized IDistributionClientResult init(IConfiguration conf, INotificationCallback notificationCallback, IStatusCallback statusCallback) {
+ IDistributionClientResult initResult;
+ if (!conf.isConsumeProduceStatusTopic()) {
+ initResult = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG,
+ "configuration is invalid: isConsumeProduceStatusTopic() should be set to 'true'");
+
+ } else if (isNull(statusCallback)) {
+ initResult = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG,
+ "configuration is invalid: statusCallback is not defined");
+ } else {
+ this.statusCallback = statusCallback;
+ initResult = init(conf, notificationCallback);
+ }
+ return initResult;
+ }
+
+ @Override
+ public synchronized IDistributionClientResult init(IConfiguration conf, INotificationCallback callback) {
+
+ log.info("DistributionClient - init");
+
+ Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
+ validateNotInitilized(errorWrapper);
+ if (errorWrapper.isEmpty()) {
+ validateNotTerminated(errorWrapper);
+ }
+ if (errorWrapper.isEmpty()) {
+ this.configuration = validateAndInitConfiguration(errorWrapper, conf).getSecond();
+ this.sdcConnector = createSdcConnector(configuration);
+ }
+ if (errorWrapper.isEmpty()) {
+ validateArtifactTypesWithSdcServer(conf, errorWrapper);
+ }
+ if (errorWrapper.isEmpty()) {
+ this.callback = callback;
+ }
+ if (errorWrapper.isEmpty()) {
+ initKafkaData(errorWrapper);
+ }
+ if (errorWrapper.isEmpty()) {
+ initKafkaProducer(errorWrapper, configuration);
+ }
+ if (errorWrapper.isEmpty()) {
+ this.notificationSender = new NotificationSender(producer);
+ }
+ IDistributionClientResult result;
+ if (errorWrapper.isEmpty()) {
+ isInitialized = true;
+ result = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS,
+ "distribution client initialized successfully");
+ } else {
+ result = errorWrapper.getInnerElement();
+ }
+
+ return result;
+ }
+
+ @Override
public IConfiguration getConfiguration() {
return configuration;
}
@Override
- /* see javadoc */
public synchronized IDistributionClientResult updateConfiguration(IConfiguration conf) {
log.info("update DistributionClient configuration");
@@ -126,33 +166,34 @@ public class DistributionClientImpl implements IDistributionClient {
return errorWrapper.getInnerElement();
}
- IDistributionClientResult updateResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "configuration updated successfuly");
+ IDistributionClientResult updateResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS,
+ "configuration updated successfully");
- boolean needToUpdateCambriaConsumer = false;
+ boolean needToUpdateConsumer = false;
if (conf.getRelevantArtifactTypes() != null && !conf.getRelevantArtifactTypes().isEmpty()) {
configuration.setRelevantArtifactTypes(conf.getRelevantArtifactTypes());
- needToUpdateCambriaConsumer = true;
+ needToUpdateConsumer = true;
}
if (isPollingIntervalValid(conf.getPollingInterval())) {
configuration.setPollingInterval(conf.getPollingInterval());
- needToUpdateCambriaConsumer = true;
+ needToUpdateConsumer = true;
}
if (isPollingTimeoutValid(conf.getPollingTimeout())) {
configuration.setPollingTimeout(conf.getPollingTimeout());
- needToUpdateCambriaConsumer = true;
+ needToUpdateConsumer = true;
}
if (conf.getConsumerGroup() != null) {
configuration.setConsumerGroup(conf.getConsumerGroup());
isConsumerGroupGenerated = false;
- needToUpdateCambriaConsumer = true;
+ needToUpdateConsumer = true;
} else if (!isConsumerGroupGenerated) {
String generatedConsumerGroup = UUID.randomUUID().toString();
configuration.setConsumerGroup(generatedConsumerGroup);
isConsumerGroupGenerated = true;
}
- if (needToUpdateCambriaConsumer) {
+ if (needToUpdateConsumer) {
updateResult = restartConsumer();
}
@@ -167,7 +208,7 @@ public class DistributionClientImpl implements IDistributionClient {
log.info("start DistributionClient");
IDistributionClientResult startResult;
- CambriaConsumer cambriaNotificationConsumer = null;
+ SdcKafkaConsumer kafkaConsumer = null;
Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
validateRunReady(errorWrapper);
if (errorWrapper.isEmpty()) {
@@ -175,49 +216,49 @@ public class DistributionClientImpl implements IDistributionClient {
}
if (errorWrapper.isEmpty()) {
try {
- cambriaNotificationConsumer = new ConsumerBuilder().authenticatedBy(credential.getApiKey(), credential.getApiSecret()).knownAs(configuration.getConsumerGroup(), configuration.getConsumerID()).onTopic(notificationTopic).usingHttps(configuration.isUseHttpsWithDmaap()).usingHosts(brokerServers)
- .withSocketTimeout(configuration.getPollingTimeout() * POLLING_TIMEOUT_MULTIPLIER).build();
-
- } catch (MalformedURLException | GeneralSecurityException e) {
- handleCambriaInitFailure(errorWrapper, e);
+ kafkaConsumer = new SdcKafkaConsumer(configuration);
+ kafkaConsumer.subscribe(configuration.getNotificationTopicName());
+ } catch (KafkaException | IllegalArgumentException e) {
+ handleMessagingClientInitFailure(errorWrapper, e);
}
}
if (errorWrapper.isEmpty()) {
-
- List<String> relevantArtifactTypes = configuration.getRelevantArtifactTypes();
- // Remove nulls from list - workaround for how configuration is built
- relevantArtifactTypes.removeAll(Collections.singleton(null));
- NotificationConsumer consumer = new NotificationConsumer(cambriaNotificationConsumer, callback, relevantArtifactTypes, this);
- executorPool = Executors.newScheduledThreadPool(DistributionClientConstants.POOL_SIZE);
- executorPool.scheduleAtFixedRate(consumer, 0, configuration.getPollingInterval(), TimeUnit.SECONDS);
-
- handleStatusConsumer(errorWrapper, executorPool);
+ startNotificationConsumer(kafkaConsumer);
+ startStatusConsumer(errorWrapper, executorPool);
}
if (!errorWrapper.isEmpty()) {
startResult = errorWrapper.getInnerElement();
} else {
- startResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "distribution client started successfuly");
+ startResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS,
+ "distribution client started successfully");
isStarted = true;
}
return startResult;
}
- private void handleStatusConsumer(Wrapper<IDistributionClientResult> errorWrapper, ScheduledExecutorService executorPool) {
+ private void startNotificationConsumer(SdcKafkaConsumer kafkaConsumer) {
+ List<String> relevantArtifactTypes = configuration.getRelevantArtifactTypes();
+ // Remove nulls from list - workaround for how configuration is built
+ relevantArtifactTypes.removeAll(Collections.singleton(null));
+ NotificationConsumer consumer = new NotificationConsumer(kafkaConsumer, callback, relevantArtifactTypes, this);
+ executorPool = Executors.newScheduledThreadPool(DistributionClientConstants.POOL_SIZE);
+ executorPool.scheduleAtFixedRate(consumer, 0, configuration.getPollingInterval(), TimeUnit.SECONDS);
+ }
+
+ private void startStatusConsumer(Wrapper<IDistributionClientResult> errorWrapper, ScheduledExecutorService executorPool) {
if (configuration.isConsumeProduceStatusTopic()) {
- CambriaConsumer cambriaStatusConsumer;
try {
- cambriaStatusConsumer = new ConsumerBuilder().authenticatedBy(credential.getApiKey(), credential.getApiSecret()).knownAs(configuration.getConsumerGroup(), configuration.getConsumerID()).onTopic(statusTopic).usingHttps(configuration.isUseHttpsWithDmaap()).usingHosts(brokerServers)
- .withSocketTimeout(configuration.getPollingTimeout() * POLLING_TIMEOUT_MULTIPLIER).build();
- StatusConsumer statusConsumer = new StatusConsumer(cambriaStatusConsumer, statusCallback);
+ SdcKafkaConsumer kafkaConsumer = new SdcKafkaConsumer(configuration);
+ kafkaConsumer.subscribe(configuration.getStatusTopicName());
+ StatusConsumer statusConsumer = new StatusConsumer(kafkaConsumer, statusCallback);
executorPool.scheduleAtFixedRate(statusConsumer, 0, configuration.getPollingInterval(), TimeUnit.SECONDS);
- } catch (MalformedURLException | GeneralSecurityException e) {
- handleCambriaInitFailure(errorWrapper, e);
+ } catch (KafkaException | IllegalArgumentException e) {
+ handleMessagingClientInitFailure(errorWrapper, e);
}
}
}
@Override
- /* see javadoc */
public synchronized IDistributionClientResult stop() {
log.info("stop DistributionClient");
@@ -226,30 +267,13 @@ public class DistributionClientImpl implements IDistributionClient {
if (!errorWrapper.isEmpty()) {
return errorWrapper.getInnerElement();
}
- // 1. stop polling notification topic
shutdownExecutor();
- // 2. send to ASDC unregister to topic
- IDistributionClientResult unregisterResult = asdcConnector.unregisterTopics(credential);
- if (unregisterResult.getDistributionActionResult() != DistributionActionResultEnum.SUCCESS) {
- log.info("client failed to unregister from topics");
- } else {
- log.info("client unregistered from topics successfully");
- }
- asdcConnector.close();
-
- try {
- cambriaIdentityManager.deleteCurrentApiKey();
- } catch (HttpException | IOException e) {
- log.debug("failed to delete cambria keys", e);
- }
- cambriaIdentityManager.close();
-
+ sdcConnector.close();
isInitialized = false;
isTerminated = true;
- DistributionClientResultImpl stopResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "distribution client stopped successfuly");
- return stopResult;
+ return new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "distribution client stopped successfully");
}
@Override
@@ -259,167 +283,59 @@ public class DistributionClientImpl implements IDistributionClient {
validateRunReady(errorWrapper);
if (!errorWrapper.isEmpty()) {
IDistributionClientResult result = errorWrapper.getInnerElement();
- IDistributionClientDownloadResult downloadResult = new DistributionClientDownloadResultImpl(result.getDistributionActionResult(), result.getDistributionMessageResult());
- return downloadResult;
+ return new DistributionClientDownloadResultImpl(result.getDistributionActionResult(), result.getDistributionMessageResult());
}
- return asdcConnector.downloadArtifact(artifactInfo);
+ return sdcConnector.downloadArtifact(artifactInfo);
}
- @Override
- public synchronized IDistributionClientResult init(IConfiguration conf, INotificationCallback notificationCallback,
- IStatusCallback statusCallback) {
- IDistributionClientResult initResult;
- if (!conf.isConsumeProduceStatusTopic()) {
- initResult = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG, "configuration is invalid: isConsumeProduceStatusTopic() should be set to 'true'");
-
- } else if (isNull(statusCallback)) {
- initResult = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG, "configuration is invalid: statusCallback is not defined");
- } else {
- this.statusCallback = statusCallback;
- initResult = init(conf, notificationCallback);
- }
- return initResult;
+ SdcConnectorClient createSdcConnector(Configuration configuration) {
+ return new SdcConnectorClient(configuration, new HttpSdcClient(configuration.getSdcAddress(),
+ new HttpClientFactory(configuration),
+ new HttpRequestFactory(configuration.getUser(), configuration.getPassword())));
}
- @Override
- /*
- * see javadoc
- */
- public synchronized IDistributionClientResult init(IConfiguration conf, INotificationCallback callback) {
-
- log.info("DistributionClient - init");
-
- Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
- validateNotInitilized(errorWrapper);
- if (errorWrapper.isEmpty()) {
- validateNotTerminated(errorWrapper);
- }
- if (errorWrapper.isEmpty()) {
- this.configuration = validateAndInitConfiguration(errorWrapper, conf).getSecond();
- this.asdcConnector = createAsdcConnector(this.configuration);
- }
- // 1. get ueb server list from configuration
- if (errorWrapper.isEmpty()) {
- List<String> servers = initUebServerList(errorWrapper);
- if (servers != null) {
- this.brokerServers = servers;
- }
- }
- // 2.validate artifact types against asdc server
- if (errorWrapper.isEmpty()) {
- validateArtifactTypesWithAsdcServer(conf, errorWrapper);
- }
- // 3. create keys
- if (errorWrapper.isEmpty()) {
- this.callback = callback;
- ApiCredential apiCredential = createUebKeys(errorWrapper);
- if (apiCredential != null) {
- this.credential = apiCredential;
- }
- }
- // 4. register for topics
- if (errorWrapper.isEmpty()) {
- TopicRegistrationResponse topics = registerForTopics(errorWrapper, this.credential);
- if (topics != null) {
- this.notificationTopic = topics.getDistrNotificationTopicName();
- this.statusTopic = topics.getDistrStatusTopicName();
- this.notificationSender = createNotificationSender();
- }
- }
-
- IDistributionClientResult result;
- if (errorWrapper.isEmpty()) {
- isInitialized = true;
- result = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "distribution client initialized successfuly");
- } else {
- result = errorWrapper.getInnerElement();
- }
-
- return result;
- }
-
- SdcConnectorClient createAsdcConnector(Configuration configuration) {
- return new SdcConnectorClient(configuration, new HttpAsdcClient(configuration));
- }
-
- private NotificationSender createNotificationSender() {
- return new NotificationSender(brokerServers);
- }
-
- private TopicRegistrationResponse registerForTopics(Wrapper<IDistributionClientResult> errorWrapper, ApiCredential credential) {
- Either<TopicRegistrationResponse, DistributionClientResultImpl> registerAsdcTopics = asdcConnector.registerAsdcTopics(credential);
- if (registerAsdcTopics.isRight()) {
-
- try {
- cambriaIdentityManager.deleteCurrentApiKey();
- } catch (HttpException | IOException e) {
- log.debug("failed to delete cambria keys", e);
- }
- errorWrapper.setInnerElement(registerAsdcTopics.right().value());
- } else {
- return registerAsdcTopics.left().value();
- }
- return null;
- }
-
- private ApiCredential createUebKeys(Wrapper<IDistributionClientResult> errorWrapper) {
- ApiCredential apiCredential = null;
-
- initCambriaClient(errorWrapper);
- if (errorWrapper.isEmpty()) {
- log.debug("create keys");
- Pair<DistributionClientResultImpl, ApiCredential> uebKeys = createUebKeys();
- DistributionClientResultImpl createKeysResponse = uebKeys.getFirst();
- apiCredential = uebKeys.getSecond();
- if (createKeysResponse.getDistributionActionResult() != DistributionActionResultEnum.SUCCESS) {
- errorWrapper.setInnerElement(createKeysResponse);
- }
- }
- return apiCredential;
- }
-
- private void validateArtifactTypesWithAsdcServer(IConfiguration conf, Wrapper<IDistributionClientResult> errorWrapper) {
- Either<List<String>, IDistributionClientResult> eitherValidArtifactTypesList = asdcConnector.getValidArtifactTypesList();
+ private void validateArtifactTypesWithSdcServer(IConfiguration conf, Wrapper<IDistributionClientResult> errorWrapper) {
+ Either<List<String>, IDistributionClientResult> eitherValidArtifactTypesList = sdcConnector.getValidArtifactTypesList();
if (eitherValidArtifactTypesList.isRight()) {
DistributionActionResultEnum errorType = eitherValidArtifactTypesList.right().value().getDistributionActionResult();
- // Support the case of a new client and older ASDC Server which does not have the API
- if (errorType != DistributionActionResultEnum.ASDC_NOT_FOUND) {
+ // Support the case of a new client and older SDC Server which does not have the API
+ if (errorType != DistributionActionResultEnum.SDC_NOT_FOUND) {
errorWrapper.setInnerElement(eitherValidArtifactTypesList.right().value());
}
} else {
- final List<String> artifactTypesFromAsdc = eitherValidArtifactTypesList.left().value();
- boolean isArtifactTypesValid = artifactTypesFromAsdc.containsAll(conf.getRelevantArtifactTypes());
+ final List<String> artifactTypesFromSdc = eitherValidArtifactTypesList.left().value();
+ boolean isArtifactTypesValid = artifactTypesFromSdc.containsAll(conf.getRelevantArtifactTypes());
if (!isArtifactTypesValid) {
- List<String> invalidArtifactTypes = new ArrayList<>();
- invalidArtifactTypes.addAll(conf.getRelevantArtifactTypes());
- invalidArtifactTypes.removeAll(artifactTypesFromAsdc);
+ List<String> invalidArtifactTypes = new ArrayList<>(conf.getRelevantArtifactTypes());
+ invalidArtifactTypes.removeAll(artifactTypesFromSdc);
DistributionClientResultImpl errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_CONTAINS_INVALID_ARTIFACT_TYPES,
- "configuration contains invalid artifact types:" + invalidArtifactTypes + " valid types are:" + artifactTypesFromAsdc);
+ "configuration contains invalid artifact types:" + invalidArtifactTypes + " valid types are:" + artifactTypesFromSdc);
errorWrapper.setInnerElement(errorResponse);
} else {
- log.debug("Artifact types: {} were validated with ASDC server", conf.getRelevantArtifactTypes());
+ log.debug("Artifact types: {} were validated with SDC server", conf.getRelevantArtifactTypes());
}
}
}
- private List<String> initUebServerList(Wrapper<IDistributionClientResult> errorWrapper) {
- List<String> brokerServers = null;
- log.debug("get ueb cluster server list from component(configuration file)");
-
- Either<List<String>, IDistributionClientResult> serverListResponse = getUEBServerList();
- if (serverListResponse.isRight()) {
- errorWrapper.setInnerElement(serverListResponse.right().value());
+ private void initKafkaData(Wrapper<IDistributionClientResult> errorWrapper) {
+ log.debug("Get MessageBus cluster information from SDC");
+ Either<KafkaDataResponse, IDistributionClientResult> kafkaData = sdcConnector.getKafkaDistData();
+ if (kafkaData.isRight()) {
+ errorWrapper.setInnerElement(kafkaData.right().value());
} else {
- brokerServers = serverListResponse.left().value();
+ KafkaDataResponse kafkaDataResponse = kafkaData.left().value();
+ configuration.setMsgBusAddress(Collections.singletonList(kafkaDataResponse.getKafkaBootStrapServer()));
+ configuration.setNotificationTopicName(kafkaDataResponse.getDistrNotificationTopicName());
+ configuration.setStatusTopicName(kafkaDataResponse.getDistrStatusTopicName());
+ log.debug("MessageBus cluster info retrieved successfully {}", kafkaData.left().value());
}
-
- return brokerServers;
}
private void validateNotInitilized(Wrapper<IDistributionClientResult> errorWrapper) {
if (isInitialized) {
log.warn("distribution client already initialized");
- DistributionClientResultImpl alreadyInitResponse = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_ALREADY_INITIALIZED, "distribution client already initialized");
+ DistributionClientResultImpl alreadyInitResponse = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_ALREADY_INITIALIZED,
+ "distribution client already initialized");
errorWrapper.setInnerElement(alreadyInitResponse);
}
}
@@ -431,31 +347,17 @@ public class DistributionClientImpl implements IDistributionClient {
}
private IDistributionClientResult sendStatus(IDistributionStatusMessageJsonBuilder statusBuilder) {
- IDistributionClientResult distributionResult;
-
- Either<CambriaBatchingPublisher, IDistributionClientResult> cambriaPublisher = getCambriaPublisher(statusTopic, configuration, brokerServers, credential);
- if (cambriaPublisher.isRight()) {
- distributionResult = cambriaPublisher.right().value();
- } else {
- String statusMessage = statusBuilder.build();
- distributionResult = notificationSender.send(cambriaPublisher.left().value(), statusMessage);
- }
-
- return distributionResult;
+ return notificationSender.send(configuration.getStatusTopicName(), statusBuilder.build());
}
- private Either<CambriaBatchingPublisher, IDistributionClientResult> getCambriaPublisher(String statusTopic, Configuration configuration, List<String> brokerServers, ApiCredential credential) {
- CambriaBatchingPublisher cambriaPublisher = null;
+ private void initKafkaProducer(Wrapper<IDistributionClientResult> errorWrapper, Configuration configuration) {
try {
- cambriaPublisher = new PublisherBuilder().onTopic(statusTopic).usingHttps(configuration.isUseHttpsWithDmaap())
- .usingHosts(brokerServers).build();
- cambriaPublisher.setApiCredentials(credential.getApiKey(), credential.getApiSecret());
- } catch (MalformedURLException | GeneralSecurityException e) {
- Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
- handleCambriaInitFailure(errorWrapper, e);
- return Either.right(errorWrapper.getInnerElement());
+ if (producer == null) {
+ producer = new SdcKafkaProducer(configuration);
+ }
+ } catch (KafkaException | IllegalStateException e) {
+ handleMessagingClientInitFailure(errorWrapper, e);
}
- return Either.left(cambriaPublisher);
}
@Override
@@ -471,27 +373,17 @@ public class DistributionClientImpl implements IDistributionClient {
if (!errorWrapper.isEmpty()) {
return errorWrapper.getInnerElement();
}
- IDistributionStatusMessageJsonBuilder builder = DistributionStatusMessageJsonBuilderFactory.prepareBuilderForNotificationStatus(getConfiguration().getConsumerID(), currentTimeMillis, distributionId, artifactInfo, isNotified);
+ IDistributionStatusMessageJsonBuilder builder = DistributionStatusMessageJsonBuilderFactory.prepareBuilderForNotificationStatus(
+ getConfiguration().getConsumerID(),
+ currentTimeMillis,
+ distributionId,
+ artifactInfo,
+ isNotified);
return sendStatus(builder);
}
/* *************************** Private Methods *************************************************** */
- protected Pair<DistributionClientResultImpl, ApiCredential> createUebKeys() {
- DistributionClientResultImpl response = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "keys created successfuly");
- ApiCredential credential = null;
- try {
- String description = String.format(DistributionClientConstants.CLIENT_DESCRIPTION, configuration.getConsumerID());
- credential = cambriaIdentityManager.createApiKey(DistributionClientConstants.EMAIL, description);
- cambriaIdentityManager.setApiCredentials(credential.getApiKey(), credential.getApiSecret());
-
- } catch (HttpException | CambriaApiException | IOException e) {
- response = new DistributionClientResultImpl(DistributionActionResultEnum.UEB_KEYS_CREATION_FAILED, "failed to create keys: " + e.getMessage());
- log.error(response.toString());
- }
- return new Pair<>(response, credential);
- }
-
private IDistributionClientResult restartConsumer() {
shutdownExecutor();
return start();
@@ -500,43 +392,36 @@ public class DistributionClientImpl implements IDistributionClient {
protected Pair<DistributionActionResultEnum, Configuration> validateAndInitConfiguration(Wrapper<IDistributionClientResult> errorWrapper, IConfiguration conf) {
DistributionActionResultEnum result = configurationValidator.validateConfiguration(conf, statusCallback);
- Configuration configuration = null;
+ Configuration configurationInit = null;
if (result == DistributionActionResultEnum.SUCCESS) {
- configuration = createConfiguration(conf);
+ configurationInit = createConfiguration(conf);
} else {
DistributionClientResultImpl initResult = new DistributionClientResultImpl(result, "configuration is invalid: " + result.name());
log.error(initResult.toString());
errorWrapper.setInnerElement(initResult);
}
- return new Pair<>(result, configuration);
+ return new Pair<>(result, configurationInit);
}
private Configuration createConfiguration(IConfiguration conf) {
- Configuration configuration = new Configuration(conf);
+ Configuration configurationCreate = new Configuration(conf);
if (!isPollingIntervalValid(conf.getPollingInterval())) {
- configuration.setPollingInterval(DistributionClientConstants.MIN_POLLING_INTERVAL_SEC);
+ configurationCreate.setPollingInterval(DistributionClientConstants.MIN_POLLING_INTERVAL_SEC);
}
if (!isPollingTimeoutValid(conf.getPollingTimeout())) {
- configuration.setPollingTimeout(DistributionClientConstants.POLLING_TIMEOUT_SEC);
+ configurationCreate.setPollingTimeout(DistributionClientConstants.POLLING_TIMEOUT_SEC);
}
if (conf.getConsumerGroup() == null) {
String generatedConsumerGroup = UUID.randomUUID().toString();
- configuration.setConsumerGroup(generatedConsumerGroup);
+ configurationCreate.setConsumerGroup(generatedConsumerGroup);
isConsumerGroupGenerated = true;
}
-
//Default use HTTPS with SDC
if (conf.isUseHttpsWithSDC() == null) {
- configuration.setUseHttpsWithSDC(true);
+ configurationCreate.setUseHttpsWithSDC(true);
}
-
- //Default use HTTPS with DMAAP
- if (conf.isUseHttpsWithDmaap() == null) {
- configuration.setUseHttpsWithDmaap(true);
- }
-
- return configuration;
+ return configurationCreate;
}
private void shutdownExecutor() {
@@ -577,7 +462,8 @@ public class DistributionClientImpl implements IDistributionClient {
private void validateInitialized(Wrapper<IDistributionClientResult> errorWrapper) {
if (!isInitialized) {
log.debug("client was not initialized");
- IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_NOT_INITIALIZED, "distribution client was not initialized");
+ IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_NOT_INITIALIZED,
+ "distribution client was not initialized");
errorWrapper.setInnerElement(result);
}
}
@@ -585,7 +471,8 @@ public class DistributionClientImpl implements IDistributionClient {
private void validateNotStarted(Wrapper<IDistributionClientResult> errorWrapper) {
if (isStarted) {
log.debug("client already started");
- IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_ALREADY_STARTED, "distribution client already started");
+ IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_ALREADY_STARTED,
+ "distribution client already started");
errorWrapper.setInnerElement(result);
}
}
@@ -593,7 +480,8 @@ public class DistributionClientImpl implements IDistributionClient {
private void validateNotTerminated(Wrapper<IDistributionClientResult> errorWrapper) {
if (isTerminated) {
log.debug("client was terminated");
- IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_IS_TERMINATED, "distribution client was terminated");
+ IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_IS_TERMINATED,
+ "distribution client was terminated");
errorWrapper.setInnerElement(result);
}
}
@@ -616,23 +504,10 @@ public class DistributionClientImpl implements IDistributionClient {
return isValid;
}
- private synchronized void initCambriaClient(Wrapper<IDistributionClientResult> errorWrapper) {
- if (cambriaIdentityManager == null) {
- try {
- AbstractAuthenticatedManagerBuilder<CambriaIdentityManager> managerBuilder = new IdentityManagerBuilder().usingHosts(brokerServers);
- if (configuration.isUseHttpsWithDmaap()) {
- managerBuilder = managerBuilder.usingHttps();
- }
- cambriaIdentityManager = managerBuilder.build();
- } catch (MalformedURLException | GeneralSecurityException e) {
- handleCambriaInitFailure(errorWrapper, e);
- }
- }
- }
- private void handleCambriaInitFailure(Wrapper<IDistributionClientResult> errorWrapper, Exception e) {
- final String errorMessage = "Failed initilizing cambria component:" + e.getMessage();
- IDistributionClientResult errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.CAMBRIA_INIT_FAILED, errorMessage);
+ private void handleMessagingClientInitFailure(Wrapper<IDistributionClientResult> errorWrapper, Exception e) {
+ final String errorMessage = "Failed initializing messaging component:" + e.getMessage();
+ IDistributionClientResult errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.MESSAGING_CLIENT_INIT_FAILED, errorMessage);
errorWrapper.setInnerElement(errorResponse);
log.error(errorMessage);
log.debug(errorMessage, e);
@@ -682,8 +557,7 @@ public class DistributionClientImpl implements IDistributionClient {
String vfModuleJsonString = new String(artifactPayload, StandardCharsets.UTF_8);
final Type type = new TypeToken<List<VfModuleMetadata>>() {
}.getType();
- List<IVfModuleMetadata> vfModules = gson.fromJson(vfModuleJsonString, type);
- return vfModules;
+ return gson.fromJson(vfModuleJsonString, type);
}
@@ -702,27 +576,12 @@ public class DistributionClientImpl implements IDistributionClient {
}
-
- public Either<List<String>, IDistributionClientResult> getUEBServerList() {
- List<String> msgBusAddresses = configuration.getMsgBusAddress();
- if (msgBusAddresses.isEmpty()) {
- return Either.right(new DistributionClientResultImpl(DistributionActionResultEnum.CONF_MISSING_MSG_BUS_ADDRESS, "Message bus address was not found in the config file"));
- } else if (getHttpProxyHost() == null && getHttpsProxyHost() == null) {
- // If there is no proxy configured, convert to valid host name
- return GeneralUtils.convertToValidHostName(msgBusAddresses);
- } else {
- // skip the IP address lookup when proxy is configured and treat all
- // hosts as valid
- return Either.left(msgBusAddresses);
- }
- }
private HttpHost getHttpProxyHost() {
HttpHost proxyHost = null;
- if (configuration.isUseSystemProxy() && System.getProperty("http.proxyHost") != null
- && System.getProperty("http.proxyPort") != null) {
+ if (Boolean.TRUE.equals(configuration.isUseSystemProxy() && System.getProperty("http.proxyHost") != null) && System.getProperty("http.proxyPort") != null) {
proxyHost = new HttpHost(System.getProperty("http.proxyHost"),
- Integer.valueOf(System.getProperty("http.proxyPort")));
+ Integer.parseInt(System.getProperty("http.proxyPort")));
} else if (configuration.getHttpProxyHost() != null && configuration.getHttpProxyPort() != 0) {
proxyHost = new HttpHost(configuration.getHttpProxyHost(), configuration.getHttpProxyPort());
}
@@ -731,10 +590,9 @@ public class DistributionClientImpl implements IDistributionClient {
private HttpHost getHttpsProxyHost() {
HttpHost proxyHost = null;
- if (configuration.isUseSystemProxy() && System.getProperty("https.proxyHost") != null
- && System.getProperty("https.proxyPort") != null) {
+ if (configuration.isUseSystemProxy() && System.getProperty("https.proxyHost") != null && System.getProperty("https.proxyPort") != null) {
proxyHost = new HttpHost(System.getProperty("https.proxyHost"),
- Integer.valueOf(System.getProperty("https.proxyPort")));
+ Integer.parseInt(System.getProperty("https.proxyPort")));
} else if (configuration.getHttpsProxyHost() != null && configuration.getHttpsProxyPort() != 0) {
proxyHost = new HttpHost(configuration.getHttpsProxyHost(), configuration.getHttpsProxyPort());
}
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientResultImpl.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientResultImpl.java
index 4edd355..77655ac 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientResultImpl.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientResultImpl.java
@@ -25,8 +25,8 @@ import org.onap.sdc.utils.DistributionActionResultEnum;
public class DistributionClientResultImpl implements IDistributionClientResult {
- private DistributionActionResultEnum responseStatus;
- private String responseMessage;
+ private final DistributionActionResultEnum responseStatus;
+ private final String responseMessage;
public DistributionClientResultImpl(DistributionActionResultEnum responseStatus, String responseMessage) {
this.responseStatus = responseStatus;
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java
index bf28d97..c59612a 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java
@@ -20,34 +20,32 @@
package org.onap.sdc.impl;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import java.util.ArrayList;
import java.util.List;
-
+import org.onap.sdc.api.consumer.INotificationCallback;
import org.onap.sdc.api.notification.IArtifactInfo;
+import org.onap.sdc.api.notification.INotificationData;
import org.onap.sdc.api.notification.IResourceInstance;
import org.onap.sdc.api.results.IDistributionClientResult;
-import org.onap.sdc.utils.DistributionActionResultEnum;
-import org.onap.sdc.api.consumer.INotificationCallback;
-import org.onap.sdc.api.notification.INotificationData;
import org.onap.sdc.utils.ArtifactTypeEnum;
+import org.onap.sdc.utils.DistributionActionResultEnum;
+import org.onap.sdc.utils.kafka.SdcKafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.att.nsa.cambria.client.CambriaConsumer;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-
class NotificationConsumer implements Runnable {
- private static Logger log = LoggerFactory.getLogger(NotificationConsumer.class.getName());
+ private static final Logger log = LoggerFactory.getLogger(NotificationConsumer.class.getName());
- private CambriaConsumer cambriaConsumer;
- private INotificationCallback clientCallback;
- private List<String> artifactsTypes;
- private DistributionClientImpl distributionClient;
+ private final SdcKafkaConsumer kafkaConsumer;
+ private final INotificationCallback clientCallback;
+ private final List<String> artifactsTypes;
+ private final DistributionClientImpl distributionClient;
- NotificationConsumer(CambriaConsumer cambriaConsumer, INotificationCallback clientCallback, List<String> artifactsTypes, DistributionClientImpl distributionClient) {
- this.cambriaConsumer = cambriaConsumer;
+ NotificationConsumer(SdcKafkaConsumer kafkaConsumer, INotificationCallback clientCallback, List<String> artifactsTypes, DistributionClientImpl distributionClient) {
+ this.kafkaConsumer = kafkaConsumer;
this.clientCallback = clientCallback;
this.artifactsTypes = artifactsTypes;
this.distributionClient = distributionClient;
@@ -55,16 +53,16 @@ class NotificationConsumer implements Runnable {
@Override
public void run() {
-
try {
Gson gson = new GsonBuilder().setPrettyPrinting().create();
long currentTimeMillis = System.currentTimeMillis();
- for (String notificationMsg : cambriaConsumer.fetch()) {
+ log.info("Polling for messages from topic: {}", kafkaConsumer.getTopicName());
+ for (String notificationMsg : kafkaConsumer.poll()) {
log.debug("received message from topic");
- log.debug("recieved notification from broker: {}", notificationMsg);
+ log.debug("received notification from broker: {}", notificationMsg);
- final NotificationDataImpl notificationFromUEB = gson.fromJson(notificationMsg, NotificationDataImpl.class);
- NotificationDataImpl notificationForCallback = buildCallbackNotificationLogic(currentTimeMillis, notificationFromUEB);
+ final NotificationDataImpl notificationFromMessageBus = gson.fromJson(notificationMsg, NotificationDataImpl.class);
+ NotificationDataImpl notificationForCallback = buildCallbackNotificationLogic(currentTimeMillis, notificationFromMessageBus);
if (isActivateCallback(notificationForCallback)) {
String stringNotificationForCallback = gson.toJson(notificationForCallback);
log.debug("sending notification to client: {}", stringNotificationForCallback);
@@ -73,8 +71,8 @@ class NotificationConsumer implements Runnable {
}
} catch (Exception e) {
- log.error("Error exception occured when fetching with Cambria Client:{}", e.getMessage());
- log.debug("Error exception occured when fetching with Cambria Client:{}", e.getMessage(), e);
+ log.error("Error exception occurred when fetching with Kafka Consumer:{}", e.getMessage());
+ log.debug("Error exception occurred when fetching with Kafka Consumer:{}", e.getMessage(), e);
}
}
@@ -85,21 +83,21 @@ class NotificationConsumer implements Runnable {
return hasRelevantArtifactsInResourceInstance || hasRelevantArtifactsInService;
}
- protected NotificationDataImpl buildCallbackNotificationLogic(long currentTimeMillis, final NotificationDataImpl notificationFromUEB) {
- List<IResourceInstance> relevantResourceInstances = buildResourceInstancesLogic(notificationFromUEB, currentTimeMillis);
- List<ArtifactInfoImpl> relevantServiceArtifacts = handleRelevantArtifacts(notificationFromUEB, currentTimeMillis, notificationFromUEB.getServiceArtifactsImpl());
- notificationFromUEB.setResources(relevantResourceInstances);
- notificationFromUEB.setServiceArtifacts(relevantServiceArtifacts);
- return notificationFromUEB;
+ protected NotificationDataImpl buildCallbackNotificationLogic(long currentTimeMillis, final NotificationDataImpl notificationFromMessageBus) {
+ List<IResourceInstance> relevantResourceInstances = buildResourceInstancesLogic(notificationFromMessageBus, currentTimeMillis);
+ List<ArtifactInfoImpl> relevantServiceArtifacts = handleRelevantArtifacts(notificationFromMessageBus, currentTimeMillis, notificationFromMessageBus.getServiceArtifactsImpl());
+ notificationFromMessageBus.setResources(relevantResourceInstances);
+ notificationFromMessageBus.setServiceArtifacts(relevantServiceArtifacts);
+ return notificationFromMessageBus;
}
- private List<IResourceInstance> buildResourceInstancesLogic(NotificationDataImpl notificationFromUEB, long currentTimeMillis) {
+ private List<IResourceInstance> buildResourceInstancesLogic(NotificationDataImpl notificationFromMessageBus, long currentTimeMillis) {
List<IResourceInstance> relevantResourceInstances = new ArrayList<>();
- for (JsonContainerResourceInstance resourceInstance : notificationFromUEB.getResourcesImpl()) {
+ for (JsonContainerResourceInstance resourceInstance : notificationFromMessageBus.getResourcesImpl()) {
final List<ArtifactInfoImpl> artifactsImplList = resourceInstance.getArtifactsImpl();
- List<ArtifactInfoImpl> foundRelevantArtifacts = handleRelevantArtifacts(notificationFromUEB, currentTimeMillis, artifactsImplList);
+ List<ArtifactInfoImpl> foundRelevantArtifacts = handleRelevantArtifacts(notificationFromMessageBus, currentTimeMillis, artifactsImplList);
if (!foundRelevantArtifacts.isEmpty() || distributionClient.getConfiguration().isFilterInEmptyResources()) {
resourceInstance.setArtifacts(foundRelevantArtifacts);
relevantResourceInstances.add(resourceInstance);
@@ -109,17 +107,17 @@ class NotificationConsumer implements Runnable {
}
- private List<ArtifactInfoImpl> handleRelevantArtifacts(NotificationDataImpl notificationFromUEB, long currentTimeMillis, final List<ArtifactInfoImpl> artifactsImplList) {
+ private List<ArtifactInfoImpl> handleRelevantArtifacts(NotificationDataImpl notificationFromMessageBus, long currentTimeMillis, final List<ArtifactInfoImpl> artifactsImplList) {
List<ArtifactInfoImpl> relevantArtifacts = new ArrayList<>();
if (artifactsImplList != null) {
for (ArtifactInfoImpl artifactInfo : artifactsImplList) {
- handleRelevantArtifact(notificationFromUEB, currentTimeMillis, artifactsImplList, relevantArtifacts, artifactInfo);
+ handleRelevantArtifact(notificationFromMessageBus, currentTimeMillis, artifactsImplList, relevantArtifacts, artifactInfo);
}
}
return relevantArtifacts;
}
- private void handleRelevantArtifact(NotificationDataImpl notificationFromUEB, long currentTimeMillis, final List<ArtifactInfoImpl> artifactsImplList, List<ArtifactInfoImpl> relevantArtifacts, ArtifactInfoImpl artifactInfo) {
+ private void handleRelevantArtifact(NotificationDataImpl notificationFromMessageBus, long currentTimeMillis, final List<ArtifactInfoImpl> artifactsImplList, List<ArtifactInfoImpl> relevantArtifacts, ArtifactInfoImpl artifactInfo) {
boolean isArtifactRelevant = artifactsTypes.contains(artifactInfo.getArtifactType());
String artifactType = artifactInfo.getArtifactType();
if (artifactInfo.getGeneratedFromUUID() != null && !artifactInfo.getGeneratedFromUUID().isEmpty()) {
@@ -131,16 +129,16 @@ class NotificationConsumer implements Runnable {
}
}
if (isArtifactRelevant) {
- setRelatedArtifacts(artifactInfo, notificationFromUEB);
+ setRelatedArtifacts(artifactInfo, notificationFromMessageBus);
if (artifactType.equals(ArtifactTypeEnum.HEAT.name()) || artifactType.equals(ArtifactTypeEnum.HEAT_VOL.name()) || artifactType.equals(ArtifactTypeEnum.HEAT_NET.name())) {
setGeneratedArtifact(artifactsImplList, artifactInfo);
}
relevantArtifacts.add(artifactInfo);
}
- IDistributionClientResult notificationStatus = distributionClient.sendNotificationStatus(currentTimeMillis, notificationFromUEB.getDistributionID(), artifactInfo, isArtifactRelevant);
+ IDistributionClientResult notificationStatus = distributionClient.sendNotificationStatus(currentTimeMillis, notificationFromMessageBus.getDistributionID(), artifactInfo, isArtifactRelevant);
if (notificationStatus.getDistributionActionResult() != DistributionActionResultEnum.SUCCESS) {
- log.error("Error failed to send notification status to UEB failed status:{}, error message:{}", notificationStatus.getDistributionActionResult().name(), notificationStatus.getDistributionMessageResult());
+ log.error("Error failed to send notification status to MessageBus failed status:{}, error message:{}", notificationStatus.getDistributionActionResult().name(), notificationStatus.getDistributionMessageResult());
}
}
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/StatusConsumer.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/StatusConsumer.java
index 5951ed0..2c69330 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/StatusConsumer.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/StatusConsumer.java
@@ -20,24 +20,23 @@
package org.onap.sdc.impl;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import org.onap.sdc.api.consumer.IStatusCallback;
import org.onap.sdc.api.notification.IStatusData;
+import org.onap.sdc.utils.kafka.SdcKafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.att.nsa.cambria.client.CambriaConsumer;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-
class StatusConsumer implements Runnable {
- private static Logger log = LoggerFactory.getLogger(StatusConsumer.class.getName());
+ private static final Logger log = LoggerFactory.getLogger(StatusConsumer.class.getName());
- private CambriaConsumer cambriaConsumer;
- private IStatusCallback clientCallback;
+ private final SdcKafkaConsumer kafkaConsumer;
+ private final IStatusCallback clientCallback;
- StatusConsumer(CambriaConsumer cambriaConsumer, IStatusCallback clientCallback) {
- this.cambriaConsumer = cambriaConsumer;
+ StatusConsumer(SdcKafkaConsumer kafkaConsumer, IStatusCallback clientCallback) {
+ this.kafkaConsumer = kafkaConsumer;
this.clientCallback = clientCallback;
}
@@ -46,18 +45,16 @@ class StatusConsumer implements Runnable {
try {
Gson gson = new GsonBuilder().setPrettyPrinting().create();
- for (String statusMsg : cambriaConsumer.fetch()) {
+ log.info("Polling for messages from topic: {}", kafkaConsumer.getTopicName());
+ for (String statusMsg : kafkaConsumer.poll()) {
log.debug("received message from topic");
- log.debug("recieved notification from broker: {}", statusMsg);
+ log.debug("received notification from broker: {}", statusMsg);
IStatusData statusData = gson.fromJson(statusMsg, StatusDataImpl.class);
clientCallback.activateCallback(statusData);
-
-
}
-
} catch (Exception e) {
- log.error("Error exception occured when fetching with Cambria Client:{}", e.getMessage());
- log.debug("Error exception occured when fetching with Cambria Client:{}", e.getMessage(), e);
+ log.error("Error exception occurred when fetching with Kafka Consumer:{}", e.getMessage());
+ log.debug("Error exception occurred when fetching with Kafka Consumer:{}", e.getMessage(), e);
}
}
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionActionResultEnum.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionActionResultEnum.java
index 514630f..834751a 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionActionResultEnum.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionActionResultEnum.java
@@ -36,23 +36,19 @@ public enum DistributionActionResultEnum {
CONFIGURATION_IS_MISSING,
CONF_MISSING_USERNAME,
CONF_MISSING_PASSWORD,
- CONF_MISSING_ASDC_FQDN,
+ CONF_MISSING_SDC_FQDN,
CONF_MISSING_ARTIFACT_TYPES,
CONF_CONTAINS_INVALID_ARTIFACT_TYPES,
CONF_MISSING_CONSUMER_ID,
CONF_MISSING_ENVIRONMENT_NAME,
- CONF_MISSING_CONSUMER_GROUP,
- CONF_INVALID_ASDC_FQDN,
+ CONF_INVALID_SDC_FQDN,
CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG,
- CONF_MISSING_MSG_BUS_ADDRESS,
- CONF_INVALID_MSG_BUS_ADDRESS,
- ASDC_AUTHENTICATION_FAILED,
- ASDC_AUTHORIZATION_FAILED,
- ASDC_NOT_FOUND,
- ASDC_SERVER_PROBLEM,
- ASDC_CONNECTION_FAILED,
- ASDC_SERVER_TIMEOUT,
+ SDC_AUTHENTICATION_FAILED,
+ SDC_AUTHORIZATION_FAILED,
+ SDC_NOT_FOUND,
+ SDC_SERVER_PROBLEM,
+ SDC_CONNECTION_FAILED,
+ SDC_SERVER_TIMEOUT,
- CAMBRIA_INIT_FAILED,
- UEB_KEYS_CREATION_FAILED
+ MESSAGING_CLIENT_INIT_FAILED
}
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionClientConstants.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionClientConstants.java
index 8432611..f4ebbcc 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionClientConstants.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionClientConstants.java
@@ -28,7 +28,7 @@ import java.util.regex.Pattern;
* @author mshitrit
*/
public final class DistributionClientConstants {
- public static final String CLIENT_DESCRIPTION = "ASDC Distribution Client Key for %s";
+ public static final String CLIENT_DESCRIPTION = "SDC Distribution Client Key for %s";
public static final Pattern FQDN_PATTERN = Pattern.compile(
"^"
+ "([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]{0,61}[a-zA-Z0-9])(\\.([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]{0,61}[a-zA-Z0-9]))*(:[0-9]{2,5})*$", Pattern.CASE_INSENSITIVE);
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/GeneralUtils.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/GeneralUtils.java
index ff5d201..6a69d8b 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/GeneralUtils.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/GeneralUtils.java
@@ -70,24 +70,4 @@ public class GeneralUtils {
}
return isEncoded;
}
-
-
- public static Either<List<String>, IDistributionClientResult> convertToValidHostName(List<String> msgBusAddresses) {
- List<String> uebLocalHostsNames = new ArrayList<>();
- for (String name : msgBusAddresses) {
- try {
- uebLocalHostsNames.add(InetAddress.getByName(name).getHostName());
- } catch (UnknownHostException e) {
- LOGGER.debug("UnknownHost: {}", e.getMessage(), e);
- }
- }
- Either<List<String>, IDistributionClientResult> response;
- if (uebLocalHostsNames.isEmpty()) {
- response = Either.right(new DistributionClientResultImpl(DistributionActionResultEnum.CONF_INVALID_MSG_BUS_ADDRESS, "configuration is invalid: " + DistributionActionResultEnum.CONF_INVALID_MSG_BUS_ADDRESS.name()));
-
- } else {
- response = Either.left(uebLocalHostsNames);
- }
- return response;
- }
}
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java
index 1fb71a6..44a9ddb 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java
@@ -20,55 +20,46 @@
package org.onap.sdc.utils;
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.cambria.client.CambriaPublisher;
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.KafkaException;
import org.onap.sdc.api.results.IDistributionClientResult;
import org.onap.sdc.impl.DistributionClientResultImpl;
+import org.onap.sdc.utils.kafka.SdcKafkaProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
public class NotificationSender {
private static final Logger log = LoggerFactory.getLogger(NotificationSender.class);
- private static final long PUBLISHER_CLOSING_TIMEOUT = 10L;
private static final long SLEEP_TIME = 1;
+ private final SdcKafkaProducer producer;
- private final List<String> brokerServers;
-
- public NotificationSender(List<String> brokerServers) {
- this.brokerServers = brokerServers;
+ public NotificationSender(SdcKafkaProducer producer) {
+ this.producer = producer;
}
- public IDistributionClientResult send(CambriaBatchingPublisher publisher, String status) {
+ public IDistributionClientResult send(String topic, String status) {
log.info("DistributionClient - sendStatus");
DistributionClientResultImpl distributionResult;
try {
- log.debug("Publisher server list: {}", brokerServers);
- log.debug("Trying to send status: {}", status);
- publisher.send("MyPartitionKey", status);
+ log.debug("Publisher server list: {}", producer.getMsgBusAddresses());
+ log.info("Trying to send status: {} \n to topic {}", status, producer.getTopicName());
+ producer.send(topic, "MyPartitionKey", status);
TimeUnit.SECONDS.sleep(SLEEP_TIME);
- } catch (IOException | InterruptedException e) {
- log.error("DistributionClient - sendDownloadStatus. Failed to send download status", e);
+ } catch (KafkaException | InterruptedException e) {
+ log.error("DistributionClient - sendStatus. Failed to send status", e);
} finally {
- distributionResult = closePublisher(publisher);
+ distributionResult = closeProducer();
}
return distributionResult;
}
- private DistributionClientResultImpl closePublisher(CambriaBatchingPublisher publisher) {
+ private DistributionClientResultImpl closeProducer() {
DistributionClientResultImpl distributionResult = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "Failed to send status");
try {
- List<CambriaPublisher.message> notSentMessages = publisher.close(PUBLISHER_CLOSING_TIMEOUT, TimeUnit.SECONDS);
- if (notSentMessages.isEmpty()) {
- distributionResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "Messages successfully sent");
- } else {
- log.debug("DistributionClient - sendDownloadStatus. {} messages were not sent", notSentMessages.size());
- }
- } catch (IOException | InterruptedException e) {
+ producer.flush();
+ distributionResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "Messages successfully sent");
+ } catch (KafkaException | IllegalArgumentException e) {
log.error("DistributionClient - sendDownloadStatus. Failed to send messages and close publisher.", e);
}
return distributionResult;
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/http/AsdcUrls.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/KafkaDataResponse.java
index b1d51d9..ac1d2ea 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/http/AsdcUrls.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/KafkaDataResponse.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* sdc-distribution-client
* ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2022 Nordix Foundation. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,17 +18,18 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.sdc.http;
+package org.onap.sdc.utils.kafka;
-public class AsdcUrls {
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
- private AsdcUrls() {
+@Getter
+@Setter
+@NoArgsConstructor
+public class KafkaDataResponse {
- }
-
- public static final String GET_CLUSTER_SERVER_LIST = "/sdc/v1/distributionUebCluster";
- public static final String GET_VALID_ARTIFACT_TYPES = "/sdc/v1/artifactTypes";
- public static final String POST_FOR_TOPIC_REGISTRATION = "/sdc/v1/registerForDistribution";
- public static final String POST_FOR_UNREGISTER = "/sdc/v1/unRegisterForDistribution";
-
-}
+ private String kafkaBootStrapServer;
+ private String distrNotificationTopicName;
+ private String distrStatusTopicName;
+} \ No newline at end of file
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java
new file mode 100644
index 0000000..71f793d
--- /dev/null
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java
@@ -0,0 +1,100 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * sdc-distribution-client
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.sdc.utils.kafka;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.onap.sdc.impl.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class that provides a KafkaConsumer to communicate with a kafka cluster
+ */
+public class SdcKafkaConsumer {
+
+ private static final Logger log = LoggerFactory.getLogger(SdcKafkaConsumer.class);
+ final KafkaConsumer<String, String> consumer;
+ private final int pollTimeout;
+ private String topicName;
+
+ /**
+ *
+ * @param configuration The config provided to the client
+ */
+ public SdcKafkaConsumer(Configuration configuration) {
+ Properties props = new Properties();
+ props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, configuration.getMsgBusAddress());
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, configuration.getKafkaSecurityProtocolConfig());
+ props.put(SaslConfigs.SASL_MECHANISM, configuration.getKafkaSaslMechanism());
+ props.put(SaslConfigs.SASL_JAAS_CONFIG, configuration.getKafkaSaslJaasConfig());
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, configuration.getConsumerGroup());
+ props.put(ConsumerConfig.CLIENT_ID_CONFIG, configuration.getConsumerID() + "-consumer-" + UUID.randomUUID());
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+ props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ consumer = new KafkaConsumer<>(props);
+ pollTimeout = configuration.getPollingTimeout();
+ }
+
+ /**
+ *
+ * @param topic The kafka topic to subscribe to
+ */
+ public void subscribe(String topic) {
+ try {
+ consumer.subscribe(Collections.singleton(topic));
+ this.topicName = topic;
+ }
+ catch (InvalidGroupIdException e) {
+ log.error("Invalid Group {}", e.getMessage());
+ }
+ }
+
+ /**
+ *
+ * @return The list of records returned from the poll
+ */
+ public List<String> poll() {
+ List<String> msgs = new ArrayList<>();
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(pollTimeout));
+ for (ConsumerRecord<String, String> rec : records) {
+ msgs.add(rec.value());
+ }
+ return msgs;
+ }
+
+ public String getTopicName() {
+ return topicName;
+ }
+} \ No newline at end of file
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaProducer.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaProducer.java
new file mode 100644
index 0000000..9826f8b
--- /dev/null
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaProducer.java
@@ -0,0 +1,98 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * sdc-distribution-client
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.sdc.utils.kafka;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.Future;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.onap.sdc.impl.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class that provides a KafkaProducer to communicate with a kafka cluster
+ */
+public class SdcKafkaProducer {
+
+ private static final Logger log = LoggerFactory.getLogger(SdcKafkaProducer.class);
+ final KafkaProducer<String, String> producer;
+ private final List<String> msgBusAddresses;
+ private final String topicName;
+
+ public SdcKafkaProducer(Configuration configuration) {
+ Properties props = new Properties();
+ props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, configuration.getMsgBusAddress());
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, configuration.getKafkaSecurityProtocolConfig());
+ props.put(SaslConfigs.SASL_MECHANISM, configuration.getKafkaSaslMechanism());
+ props.put(SaslConfigs.SASL_JAAS_CONFIG, configuration.getKafkaSaslJaasConfig());
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, configuration.getConsumerID() + "-producer-" + UUID.randomUUID());
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ producer = new KafkaProducer<>(props);
+ msgBusAddresses = configuration.getMsgBusAddress();
+ topicName = configuration.getStatusTopicName();
+ }
+
+ /**
+ *
+ * @param topicName The name of the topic to publish to
+ * @param key The key value of the ProducerRecord
+ * @param value The value of the ProducerRecord
+ * @return The RecordMetedata of the request
+ */
+ public Future<RecordMetadata> send(String topicName, String key, String value) {
+ Future<RecordMetadata> data;
+ try {
+ data = producer.send(new ProducerRecord<>(topicName, key, value));
+ } catch (KafkaException e) {
+ log.error("Failed the send data: exc {}", e.getMessage());
+ throw e;
+ }
+ return data;
+ }
+ /**
+ *
+ */
+ public void flush() {
+ try {
+ producer.flush();
+ }
+ catch (KafkaException e) {
+ log.error("Failed to send data: exc {}", e.getMessage());
+ }
+ }
+
+ public List<String> getMsgBusAddresses() {
+ return msgBusAddresses;
+ }
+
+ public String getTopicName() {
+ return topicName;
+ }
+} \ No newline at end of file
diff --git a/sdc-distribution-client/src/test/java/org/onap/sdc/api/asdc/RegistrationRequestTest.java b/sdc-distribution-client/src/test/java/org/onap/sdc/api/asdc/RegistrationRequestTest.java
deleted file mode 100644
index 304cb56..0000000
--- a/sdc-distribution-client/src/test/java/org/onap/sdc/api/asdc/RegistrationRequestTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * SDC
- * ================================================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.sdc.api.asdc;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import java.util.Collections;
-import java.util.List;
-import org.junit.jupiter.api.Test;
-
-class RegistrationRequestTest {
-
- private static final List<String> DIST_ENV_END_POINTS = Collections.emptyList();
- private static final boolean IS_CONSUMER_TO_SDC_DISTR_STATUS_TOPIC = true;
- private static final String ENV_NAME = "ENV_NAME";
- private static final String API_KEY = "API_KEY";
-
- @Test
- void testConstructorShouldSetProperties() {
- RegistrationRequest registrationRequest =
- new RegistrationRequest(API_KEY, ENV_NAME, IS_CONSUMER_TO_SDC_DISTR_STATUS_TOPIC, DIST_ENV_END_POINTS);
- assertEquals(API_KEY, registrationRequest.getApiPublicKey());
- assertEquals(DIST_ENV_END_POINTS, registrationRequest.getDistEnvEndPoints());
- assertEquals(ENV_NAME, registrationRequest.getDistrEnvName());
- assertTrue(registrationRequest.getIsConsumerToSdcDistrStatusTopic());
- }
-}
diff --git a/sdc-distribution-client/src/test/java/org/onap/sdc/http/HttpAsdcClientResponseTest.java b/sdc-distribution-client/src/test/java/org/onap/sdc/http/HttpAsdcClientResponseTest.java
index 8a912d9..4ff01c5 100644
--- a/sdc-distribution-client/src/test/java/org/onap/sdc/http/HttpAsdcClientResponseTest.java
+++ b/sdc-distribution-client/src/test/java/org/onap/sdc/http/HttpAsdcClientResponseTest.java
@@ -33,8 +33,7 @@ import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
-class HttpAsdcClientResponseTest {
-
+class HttpSdcClientResponseTest {
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][]{
{HttpStatus.SC_INTERNAL_SERVER_ERROR, "failed to send request"},
@@ -47,14 +46,14 @@ class HttpAsdcClientResponseTest {
@MethodSource("data")
void shouldCreateHttpResponse(int httpStatusCode, String httpMessage) throws IOException {
// when
- final HttpAsdcResponse response = HttpAsdcClient.createHttpResponse(httpStatusCode, httpMessage);
+ final HttpSdcResponse response = HttpSdcClient.createHttpResponse(httpStatusCode, httpMessage);
// then
assertEquals(httpStatusCode, response.getStatus());
assertEquals(httpMessage, getResponseMessage(response));
}
- private String getResponseMessage(HttpAsdcResponse response) throws IOException {
+ private String getResponseMessage(HttpSdcResponse response) throws IOException {
return IOUtils.toString(response.getMessage().getContent(), StandardCharsets.UTF_8);
}
-}
+ }
diff --git a/sdc-distribution-client/src/test/java/org/onap/sdc/http/HttpAsdcClientTest.java b/sdc-distribution-client/src/test/java/org/onap/sdc/http/HttpAsdcClientTest.java
index 2dcfd5d..cbd12c3 100644
--- a/sdc-distribution-client/src/test/java/org/onap/sdc/http/HttpAsdcClientTest.java
+++ b/sdc-distribution-client/src/test/java/org/onap/sdc/http/HttpAsdcClientTest.java
@@ -43,8 +43,7 @@ import org.onap.sdc.utils.Pair;
import org.onap.sdc.utils.TestConfiguration;
@ExtendWith(MockitoExtension.class)
-class HttpAsdcClientTest {
-
+class HttpSdcClientTest {
private static final String URL = "http://127.0.0.1:8080/target";
private static final int HTTP_OK = 200;
private static final String K_1 = "k1";
@@ -71,14 +70,14 @@ class HttpAsdcClientTest {
final HttpRequestFactory httpRequestFactory = new HttpRequestFactory(
configuration.getUser(),
configuration.getPassword());
- final HttpAsdcClient httpAsdcClient = new HttpAsdcClient(
- configuration.getAsdcAddress(),
+ final HttpSdcClient httpSdcClient = new HttpSdcClient(
+ configuration.getSdcAddress(),
new HttpClientFactory(configuration),
httpRequestFactory);
// then
- assertNotNull(httpAsdcClient);
- assertEquals(HttpClientFactory.HTTP, httpAsdcClient.getHttpSchema());
+ assertNotNull(httpSdcClient);
+ assertEquals(HttpClientFactory.HTTP, httpSdcClient.getHttpSchema());
}
@Test
@@ -91,25 +90,25 @@ class HttpAsdcClientTest {
final HttpRequestFactory httpRequestFactory = new HttpRequestFactory(
configuration.getUser(),
configuration.getPassword());
- final HttpAsdcClient httpAsdcClient = new HttpAsdcClient(
- configuration.getAsdcAddress(),
+ final HttpSdcClient httpSdcClient = new HttpSdcClient(
+ configuration.getSdcAddress(),
new HttpClientFactory(configuration),
httpRequestFactory);
// then
- assertNotNull(httpAsdcClient);
- assertEquals(HttpClientFactory.HTTPS, httpAsdcClient.getHttpSchema());
+ assertNotNull(httpSdcClient);
+ assertEquals(HttpClientFactory.HTTPS, httpSdcClient.getHttpSchema());
}
@Test
void shouldSendGetRequestWithoutAnyError() throws IOException {
// given
TestConfiguration configuration = givenHttpConfiguration();
- final HttpAsdcClient httpAsdcClient = createTestObj(HttpClientFactory.HTTP, configuration, httpClient);
+ final HttpSdcClient httpSdcClient = createTestObj(HttpClientFactory.HTTP, configuration, httpClient);
CloseableHttpResponse httpResponse = givenHttpResponse(true);
// when
- final HttpAsdcResponse response = httpAsdcClient.getRequest(URL, HEADERS_MAP);
+ final HttpSdcResponse response = httpSdcClient.getRequest(URL, HEADERS_MAP);
// then
assertThat(response).isNotNull();
@@ -127,11 +126,11 @@ class HttpAsdcClientTest {
void shouldSendPostRequestWithoutAnyError() throws IOException {
// given
TestConfiguration configuration = givenHttpConfiguration();
- final HttpAsdcClient httpAsdcClient = createTestObj(HttpClientFactory.HTTP, configuration, httpClient);
+ final HttpSdcClient httpSdcClient = createTestObj(HttpClientFactory.HTTP, configuration, httpClient);
CloseableHttpResponse httpResponse = givenHttpResponse(false);
// when
- final HttpAsdcResponse response = httpAsdcClient.postRequest(URL, httpEntity, HEADERS_MAP);
+ final HttpSdcResponse response = httpSdcClient.postRequest(URL,httpEntity, HEADERS_MAP);
// then
assertThat(response).isNotNull();
@@ -141,17 +140,17 @@ class HttpAsdcClientTest {
}
- private HttpAsdcClient createTestObj(String httpProtocol, TestConfiguration configuration, CloseableHttpClient httpClient) {
+ private HttpSdcClient createTestObj(String httpProtocol, TestConfiguration configuration, CloseableHttpClient httpClient) {
final HttpRequestFactory httpRequestFactory = new HttpRequestFactory(
configuration.getUser(),
configuration.getPassword());
HttpClientFactory httpClientFactory = mock(HttpClientFactory.class);
when(httpClientFactory.createInstance()).thenReturn(new Pair<>(httpProtocol, httpClient));
- final HttpAsdcClient httpAsdcClient = new HttpAsdcClient(
- configuration.getAsdcAddress(),
+ final HttpSdcClient httpSdcClient = new HttpSdcClient(
+ configuration.getSdcAddress(),
httpClientFactory,
httpRequestFactory);
- return httpAsdcClient;
+ return httpSdcClient;
}
private CloseableHttpResponse givenHttpResponse(HttpEntity httpEntity, Header[] headers, boolean includeGetAllHeaders) {
diff --git a/sdc-distribution-client/src/test/java/org/onap/sdc/http/HttpClientFactoryTest.java b/sdc-distribution-client/src/test/java/org/onap/sdc/http/HttpClientFactoryTest.java
index 347b7f5..2292fc4 100644
--- a/sdc-distribution-client/src/test/java/org/onap/sdc/http/HttpClientFactoryTest.java
+++ b/sdc-distribution-client/src/test/java/org/onap/sdc/http/HttpClientFactoryTest.java
@@ -41,7 +41,7 @@ class HttpClientFactoryTest {
TestConfiguration config = spy(new TestConfiguration());
HttpClientFactory httpClientFactory = new HttpClientFactory(config);
when(config.activateServerTLSAuth()).thenReturn(true);
- when(config.getKeyStorePath()).thenReturn("src/test/resources/asdc-client.jks");
+ when(config.getKeyStorePath()).thenReturn("src/test/resources/sdc-client.jks");
when(config.getKeyStorePassword()).thenReturn("Aa123456");
Pair<String, CloseableHttpClient> client = httpClientFactory.createInstance();
SSLConnectionSocketFactory sslsf = spy(SSLConnectionSocketFactory.getSocketFactory());
@@ -74,13 +74,13 @@ class HttpClientFactoryTest {
}
@Test
- void shouldReturnSSLConnectionError() throws HttpAsdcClientException {
+ void shouldReturnSSLConnectionError() throws HttpSdcClientException{
TestConfiguration config = spy(new TestConfiguration());
HttpClientFactory httpClientFactory = new HttpClientFactory(config);
when(config.activateServerTLSAuth()).thenReturn(true);
when(config.getKeyStorePath()).thenReturn("src/test/resources/dummy.jks");
when(config.getKeyStorePassword()).thenReturn("Aa123456");
- assertThrows(HttpAsdcClientException.class, () -> httpClientFactory.createInstance());
+ assertThrows(HttpSdcClientException.class, () -> httpClientFactory.createInstance());
}
}
diff --git a/sdc-distribution-client/src/test/java/org/onap/sdc/http/SdcConnectorClientTest.java b/sdc-distribution-client/src/test/java/org/onap/sdc/http/SdcConnectorClientTest.java
index b09de78..61b8388 100644
--- a/sdc-distribution-client/src/test/java/org/onap/sdc/http/SdcConnectorClientTest.java
+++ b/sdc-distribution-client/src/test/java/org/onap/sdc/http/SdcConnectorClientTest.java
@@ -22,20 +22,14 @@
package org.onap.sdc.http;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import com.att.nsa.apiClient.credentials.ApiCredential;
import com.google.common.hash.Hashing;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
@@ -43,199 +37,180 @@ import fj.data.Either;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.commons.io.IOUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.Matchers;
import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.onap.sdc.api.asdc.RegistrationRequest;
import org.onap.sdc.api.consumer.IConfiguration;
import org.onap.sdc.api.notification.IArtifactInfo;
import org.onap.sdc.api.results.IDistributionClientResult;
-import org.onap.sdc.impl.DistributionClientResultImpl;
import org.onap.sdc.utils.DistributionActionResultEnum;
import org.onap.sdc.utils.Pair;
+import org.onap.sdc.utils.kafka.KafkaDataResponse;
-class SdcConnectorClientTest {
+public class SdcConnectorClientTest {
+
+ private static final Gson gson = new GsonBuilder().create();
+ private static final HttpSdcClient httpClient = mock(HttpSdcClient.class);
+ private static final IConfiguration configuration = mock(IConfiguration.class);
+ private static final HttpSdcResponse httpSdcResponse = mock(HttpSdcResponse.class);
+ private static final Map<String, String> mockHeaders = new HashMap<>();
+ private static SdcConnectorClient sdcClient;
- private static final String MOCK_ENV = "MockEnv";
- private static final String MOCK_API_KEY = "MockApikey";
private static final String ARTIFACT_URL = "http://127.0.0.1/artifact/url";
private static final String IT_JUST_DIDN_T_WORK = "It just didn't work";
private static final List<String> ARTIFACT_TYPES = Arrays.asList("Service", "Resource", "VF", "VFC");
- private static final int PORT = 49512;
- private static final byte[] BYTES = new byte[]{0xA, 0xB, 0xC, 0xD};
- private static Gson gson = new GsonBuilder().create();
private static final String VALID_JSON_PAYLOAD = gson.toJson(ARTIFACT_TYPES);
- private static HttpAsdcClient httpClient = mock(HttpAsdcClient.class);
- private static IConfiguration configuration = mock(IConfiguration.class);
- private static ApiCredential apiCredential = mock(ApiCredential.class);
- private static HttpAsdcResponse httpAsdcResponse = mock(HttpAsdcResponse.class);
- @SuppressWarnings("unchecked")
- private static Either<TopicRegistrationResponse, DistributionClientResultImpl> mockResponse =
- Mockito.mock(Either.class);
- private static Map<String, String> mockHeaders = new HashMap<>();
- private static SdcConnectorClient asdcClient;
- Pair<HttpAsdcResponse, CloseableHttpResponse> mockPair = new Pair<>(httpAsdcResponse, null);
- private HttpEntity lastHttpEntity = null;
+ private static final String KAFKA_DATA = "{\"kafkaBootStrapServer\": \"onap-strimzi-kafka-bootstrap:9092\",\"distrNotificationTopicName\": \"SDC-DISTR-NOTIF-TOPIC-AUTO\",\"distrStatusTopicName\": \"SDC-DISTR-STATUS-TOPIC-AUTO\"}";
+ private static final String VALID_KAFKA_JSON_PAYLOAD = gson.toJson(KAFKA_DATA);
+ private static final int PORT = 49512;
+ private static final byte[] BYTES = new byte[] {0xA, 0xB, 0xC, 0xD};
@BeforeAll
public static void beforeClass() {
- asdcClient = Mockito.spy(new SdcConnectorClient(configuration, httpClient));
- when(apiCredential.getApiKey()).thenReturn(MOCK_API_KEY);
- when(httpAsdcResponse.getStatus()).thenReturn(HttpStatus.SC_OK);
+ sdcClient = Mockito.spy(new SdcConnectorClient(configuration, httpClient));
+ when(httpSdcResponse.getStatus()).thenReturn(HttpStatus.SC_OK);
- doReturn(mockHeaders).when(asdcClient).addHeadersToHttpRequest(Mockito.anyString());
- doReturn(mockResponse).when(asdcClient).parseRegistrationResponse(httpAsdcResponse);
- }
-
- @BeforeEach
- public void beforeMethod() {
- Mockito.reset(configuration, httpClient);
- lastHttpEntity = null;
- when(configuration.getEnvironmentName()).thenReturn(MOCK_ENV);
-
- doAnswer(new Answer<Pair<HttpAsdcResponse, CloseableHttpResponse>>() {
- @Override
- public Pair<HttpAsdcResponse, CloseableHttpResponse> answer(InvocationOnMock invocation) throws Throwable {
- lastHttpEntity = invocation.getArgument(1, HttpEntity.class);
- return mockPair;
- }
- }).when(httpClient).postRequest(eq(AsdcUrls.POST_FOR_TOPIC_REGISTRATION), any(HttpEntity.class), eq(mockHeaders), eq(false));
+ doReturn(mockHeaders).when(sdcClient).addHeadersToHttpRequest(Mockito.anyString());
}
@Test
- void initAndCloseTest() {
+ public void initAndCloseTest() {
IConfiguration conf = Mockito.mock(IConfiguration.class);
when(conf.getUser()).thenReturn("user");
when(conf.getPassword()).thenReturn("password");
when(conf.isUseHttpsWithSDC()).thenReturn(true);
when(conf.activateServerTLSAuth()).thenReturn(false);
- final HttpAsdcClient httpClient = new HttpAsdcClient(conf);
+ final HttpSdcClient httpClient = new HttpSdcClient(conf);
SdcConnectorClient client = new SdcConnectorClient(conf, httpClient);
client.close();
-
- assertThrows(IllegalStateException.class, () -> {
- //check if client is really closed
- httpClient.getRequest(AsdcUrls.POST_FOR_TOPIC_REGISTRATION, new HashMap<>());
- });
-
}
@Test
- void testConsumeProduceStatusTopicFalse() throws UnsupportedOperationException, IOException {
- testConsumeProduceStatusTopic(false);
+ void getValidKafkaDataHappyScenarioTest() throws IOException {
+ HttpSdcResponse responseMock = mock(HttpSdcResponse.class);
+ CloseableHttpResponse closeableHttpResponseMock = mock(CloseableHttpResponse.class);
+ HttpEntity messageMock = mock(HttpEntity.class);
+ Pair<HttpSdcResponse, CloseableHttpResponse> responsePair =
+ new Pair<>(responseMock, closeableHttpResponseMock);
+
+ when(responseMock.getStatus()).thenReturn(HttpStatus.SC_OK);
+ when(responseMock.getMessage()).thenReturn(messageMock);
+ when(messageMock.getContent()).thenReturn(new ByteArrayInputStream(KAFKA_DATA.getBytes()));
+ when(httpClient.getRequest(eq(SdcUrls.GET_KAFKA_DIST_DATA), Matchers.any(), eq(false)))
+ .thenReturn(responsePair);
+
+ Either<KafkaDataResponse, IDistributionClientResult> result = sdcClient.getKafkaDistData();
+ assertTrue(result.isLeft());
+ KafkaDataResponse kafkaDataResponse = result.left().value();
+ assertEquals("SDC-DISTR-NOTIF-TOPIC-AUTO", kafkaDataResponse.getDistrNotificationTopicName());
}
@Test
- void testConsumeProduceStatusTopicTrue() throws UnsupportedOperationException, IOException {
- testConsumeProduceStatusTopic(true);
- }
+ void getValidKafkaDataErrorResponseScenarioTest() throws IOException {
+ HttpSdcResponse responseMock = mock(HttpSdcResponse.class);
+ HttpEntity messageMock = mock(HttpEntity.class);
+ Pair<HttpSdcResponse, CloseableHttpResponse> responsePair = new Pair<>(responseMock, null);
+
+ when(responseMock.getStatus()).thenReturn(HttpStatus.SC_GATEWAY_TIMEOUT);
+ when(responseMock.getMessage()).thenReturn(messageMock);
+ when(messageMock.getContent()).thenReturn(new ByteArrayInputStream(IT_JUST_DIDN_T_WORK.getBytes()));
+ when(httpClient.getRequest(eq(SdcUrls.GET_KAFKA_DIST_DATA), Matchers.any(), eq(false)))
+ .thenReturn(responsePair);
- private void testConsumeProduceStatusTopic(final boolean isConsumeProduceStatusFlag) throws IOException {
- when(configuration.isConsumeProduceStatusTopic()).thenReturn(isConsumeProduceStatusFlag);
- asdcClient.registerAsdcTopics(apiCredential);
- verify(httpClient, times(1)).postRequest(eq(AsdcUrls.POST_FOR_TOPIC_REGISTRATION), any(HttpEntity.class), eq(mockHeaders), eq(false));
- assertNotNull(lastHttpEntity);
- RegistrationRequest actualRegRequest
- = gson.fromJson(IOUtils.toString(lastHttpEntity.getContent(), StandardCharsets.UTF_8), RegistrationRequest.class);
- RegistrationRequest expectedRegRequest
- = gson.fromJson(excpectedStringBody(isConsumeProduceStatusFlag), RegistrationRequest.class);
-
- assertEquals(expectedRegRequest.getApiPublicKey(), actualRegRequest.getApiPublicKey());
- assertEquals(expectedRegRequest.getDistrEnvName(), actualRegRequest.getDistrEnvName());
- assertEquals(expectedRegRequest.getIsConsumerToSdcDistrStatusTopic(), actualRegRequest.getIsConsumerToSdcDistrStatusTopic());
+ Either<KafkaDataResponse, IDistributionClientResult> result = sdcClient.getKafkaDistData();
+ assertTrue(result.isRight());
+ IDistributionClientResult distributionClientResult = result.right().value();
+ assertEquals(DistributionActionResultEnum.SDC_SERVER_TIMEOUT,
+ distributionClientResult.getDistributionActionResult());
}
@Test
- void getValidArtifactTypesListHappyScenarioTest() throws IOException {
- HttpAsdcResponse responseMock = mock(HttpAsdcResponse.class);
+ public void getValidArtifactTypesListHappyScenarioTest() throws IOException {
+ HttpSdcResponse responseMock = mock(HttpSdcResponse.class);
CloseableHttpResponse closeableHttpResponseMock = mock(CloseableHttpResponse.class);
HttpEntity messageMock = mock(HttpEntity.class);
- Pair<HttpAsdcResponse, CloseableHttpResponse> responsePair =
+ Pair<HttpSdcResponse, CloseableHttpResponse> responsePair =
new Pair<>(responseMock, closeableHttpResponseMock);
when(responseMock.getStatus()).thenReturn(HttpStatus.SC_OK);
when(responseMock.getMessage()).thenReturn(messageMock);
when(messageMock.getContent()).thenReturn(new ByteArrayInputStream(VALID_JSON_PAYLOAD.getBytes()));
- when(httpClient.getRequest(eq(AsdcUrls.GET_VALID_ARTIFACT_TYPES), any(), eq(false)))
+ when(httpClient.getRequest(eq(SdcUrls.GET_VALID_ARTIFACT_TYPES), Matchers.any(), eq(false)))
.thenReturn(responsePair);
- Either<List<String>, IDistributionClientResult> result = asdcClient.getValidArtifactTypesList();
+ Either<List<String>, IDistributionClientResult> result = sdcClient.getValidArtifactTypesList();
assertTrue(result.isLeft());
List<String> list = result.left().value();
assertEquals(ARTIFACT_TYPES, list);
}
@Test
- void getValidArtifactTypesListErrorResponseScenarioTest() throws IOException {
- HttpAsdcResponse responseMock = mock(HttpAsdcResponse.class);
+ public void getValidArtifactTypesListErrorResponseScenarioTest() throws IOException {
+ HttpSdcResponse responseMock = mock(HttpSdcResponse.class);
HttpEntity messageMock = mock(HttpEntity.class);
- Pair<HttpAsdcResponse, CloseableHttpResponse> responsePair = new Pair<>(responseMock, null);
+ Pair<HttpSdcResponse, CloseableHttpResponse> responsePair = new Pair<>(responseMock, null);
when(responseMock.getStatus()).thenReturn(HttpStatus.SC_GATEWAY_TIMEOUT);
when(responseMock.getMessage()).thenReturn(messageMock);
when(messageMock.getContent()).thenReturn(new ByteArrayInputStream(IT_JUST_DIDN_T_WORK.getBytes()));
- when(httpClient.getRequest(eq(AsdcUrls.GET_VALID_ARTIFACT_TYPES), any(), eq(false)))
+ when(httpClient.getRequest(eq(SdcUrls.GET_VALID_ARTIFACT_TYPES), Matchers.any(), eq(false)))
.thenReturn(responsePair);
- Either<List<String>, IDistributionClientResult> result = asdcClient.getValidArtifactTypesList();
+ Either<List<String>, IDistributionClientResult> result = sdcClient.getValidArtifactTypesList();
assertTrue(result.isRight());
IDistributionClientResult distributionClientResult = result.right().value();
- assertEquals(DistributionActionResultEnum.ASDC_SERVER_TIMEOUT,
+ assertEquals(DistributionActionResultEnum.SDC_SERVER_TIMEOUT,
distributionClientResult.getDistributionActionResult());
}
@Test
- void getValidArtifactTypesListExceptionDuringConnectionClosingTest() throws IOException {
- HttpAsdcResponse responseMock = mock(HttpAsdcResponse.class);
+ public void getValidArtifactTypesListExceptionDuringConnectionClosingTest() throws IOException {
+ HttpSdcResponse responseMock = mock(HttpSdcResponse.class);
CloseableHttpResponse closeableHttpResponseMock = mock(CloseableHttpResponse.class);
HttpEntity messageMock = mock(HttpEntity.class);
- Pair<HttpAsdcResponse, CloseableHttpResponse> responsePair =
+ Pair<HttpSdcResponse, CloseableHttpResponse> responsePair =
new Pair<>(responseMock, closeableHttpResponseMock);
when(responseMock.getStatus()).thenReturn(HttpStatus.SC_GATEWAY_TIMEOUT);
when(responseMock.getMessage()).thenReturn(messageMock);
when(messageMock.getContent()).thenReturn(new ByteArrayInputStream(VALID_JSON_PAYLOAD.getBytes()));
- when(httpClient.getRequest(eq(AsdcUrls.GET_VALID_ARTIFACT_TYPES), any(), eq(false)))
+ when(httpClient.getRequest(eq(SdcUrls.GET_VALID_ARTIFACT_TYPES), Matchers.any(), eq(false)))
.thenReturn(responsePair);
doThrow(new IOException("Test exception")).when(closeableHttpResponseMock).close();
- Either<List<String>, IDistributionClientResult> result = asdcClient.getValidArtifactTypesList();
+ Either<List<String>, IDistributionClientResult> result = sdcClient.getValidArtifactTypesList();
assertTrue(result.isRight());
IDistributionClientResult distributionClientResult = result.right().value();
- assertEquals(DistributionActionResultEnum.ASDC_SERVER_TIMEOUT,
+ assertEquals(DistributionActionResultEnum.SDC_SERVER_TIMEOUT,
distributionClientResult.getDistributionActionResult());
}
@Test
- void getValidArtifactTypesListParsingExceptionHandlingTest() throws IOException {
- HttpAsdcResponse responseMock = mock(HttpAsdcResponse.class);
+ public void getValidArtifactTypesListParsingExceptionHandlingTest() throws IOException {
+ HttpSdcResponse responseMock = mock(HttpSdcResponse.class);
CloseableHttpResponse closeableHttpResponseMock = mock(CloseableHttpResponse.class);
HttpEntity messageMock = mock(HttpEntity.class);
- Pair<HttpAsdcResponse, CloseableHttpResponse> responsePair =
+ Pair<HttpSdcResponse, CloseableHttpResponse> responsePair =
new Pair<>(responseMock, closeableHttpResponseMock);
when(responseMock.getStatus()).thenReturn(HttpStatus.SC_OK);
when(responseMock.getMessage()).thenReturn(messageMock);
when(messageMock.getContent()).thenReturn(new ThrowingInputStreamForTesting());
- when(httpClient.getRequest(eq(AsdcUrls.GET_VALID_ARTIFACT_TYPES), any(), eq(false)))
+ when(httpClient.getRequest(eq(SdcUrls.GET_VALID_ARTIFACT_TYPES), Matchers.any(), eq(false)))
.thenReturn(responsePair);
- Either<List<String>, IDistributionClientResult> result = asdcClient.getValidArtifactTypesList();
+ Either<List<String>, IDistributionClientResult> result = sdcClient.getValidArtifactTypesList();
assertTrue(result.isRight());
IDistributionClientResult distributionClientResult = result.right().value();
assertEquals(DistributionActionResultEnum.GENERAL_ERROR,
@@ -243,59 +218,17 @@ class SdcConnectorClientTest {
}
@Test
- void unregisterTopicsErrorDuringProcessingTest() throws IOException {
- when(configuration.getAsdcAddress()).thenReturn("127.0.0.1" + PORT);
- when(configuration.isConsumeProduceStatusTopic()).thenReturn(false);
- when(configuration.getMsgBusAddress())
- .thenReturn(Arrays.asList("http://127.0.0.1:45321/dmaap", "http://127.0.0.1:45321/dmaap"));
-
- String failMessage = "It just didn't work";
- HttpAsdcResponse responseMock = mock(HttpAsdcResponse.class);
- HttpEntity messageMock = mock(HttpEntity.class);
- Pair<HttpAsdcResponse, CloseableHttpResponse> responsePair = new Pair<>(responseMock, null);
-
- when(responseMock.getStatus()).thenReturn(HttpStatus.SC_BAD_GATEWAY);
- when(responseMock.getMessage()).thenReturn(messageMock);
- when(messageMock.getContent()).thenReturn(new ByteArrayInputStream(failMessage.getBytes()));
- doReturn(responsePair).when(httpClient)
- .postRequest(eq(AsdcUrls.POST_FOR_UNREGISTER), any(HttpEntity.class), any(), eq(false));
-
- IDistributionClientResult result = asdcClient.unregisterTopics(apiCredential);
- assertEquals(DistributionActionResultEnum.ASDC_CONNECTION_FAILED, result.getDistributionActionResult());
- }
-
- @Test
- void unregisterTopicsHappyScenarioTest() throws IOException {
- when(configuration.getAsdcAddress()).thenReturn("127.0.0.1" + PORT);
- when(configuration.isConsumeProduceStatusTopic()).thenReturn(false);
-
- String failMessage = "";
- HttpAsdcResponse responseMock = mock(HttpAsdcResponse.class);
- HttpEntity messageMock = mock(HttpEntity.class);
- Pair<HttpAsdcResponse, CloseableHttpResponse> responsePair = new Pair<>(responseMock, null);
-
- when(responseMock.getStatus()).thenReturn(HttpStatus.SC_NO_CONTENT);
- when(responseMock.getMessage()).thenReturn(messageMock);
- when(messageMock.getContent()).thenReturn(new ByteArrayInputStream(failMessage.getBytes()));
- doReturn(responsePair).when(httpClient)
- .postRequest(eq(AsdcUrls.POST_FOR_UNREGISTER), any(HttpEntity.class), any(), eq(false));
-
- IDistributionClientResult result = asdcClient.unregisterTopics(apiCredential);
- assertEquals(DistributionActionResultEnum.SUCCESS, result.getDistributionActionResult());
- }
-
- @Test
- void downloadArtifactHappyScenarioTest() throws IOException {
+ public void downloadArtifactHappyScenarioTest() throws IOException {
Map<String, String> headers = new HashMap<>();
- headers.put(asdcClient.CONTENT_DISPOSITION_HEADER, "SomeHeader");
+ headers.put(SdcConnectorClient.CONTENT_DISPOSITION_HEADER, "SomeHeader");
IArtifactInfo artifactInfo = mock(IArtifactInfo.class);
when(artifactInfo.getArtifactURL()).thenReturn(ARTIFACT_URL);
when(artifactInfo.getArtifactChecksum()).thenReturn(Hashing.md5().hashBytes(BYTES).toString());
- HttpAsdcResponse responseMock = mock(HttpAsdcResponse.class);
+ HttpSdcResponse responseMock = mock(HttpSdcResponse.class);
HttpEntity messageMock = mock(HttpEntity.class);
- Pair<HttpAsdcResponse, CloseableHttpResponse> responsePair = new Pair<>(responseMock, null);
+ Pair<HttpSdcResponse, CloseableHttpResponse> responsePair = new Pair<>(responseMock, null);
when(responseMock.getStatus()).thenReturn(HttpStatus.SC_OK);
when(responseMock.getMessage()).thenReturn(messageMock);
@@ -303,70 +236,62 @@ class SdcConnectorClientTest {
when(messageMock.getContent()).thenReturn(new ByteArrayInputStream(BYTES));
doReturn(responsePair).when(httpClient).getRequest(eq(ARTIFACT_URL), any(), eq(false));
- IDistributionClientResult result = asdcClient.downloadArtifact(artifactInfo);
+ IDistributionClientResult result = sdcClient.downloadArtifact(artifactInfo);
assertEquals(DistributionActionResultEnum.SUCCESS, result.getDistributionActionResult());
}
@Test
- void downloadArtifactDataIntegrityProblemTest() throws IOException {
+ public void downloadArtifactDataIntegrityProblemTest() throws IOException {
IArtifactInfo artifactInfo = mock(IArtifactInfo.class);
when(artifactInfo.getArtifactURL()).thenReturn(ARTIFACT_URL);
- HttpAsdcResponse responseMock = mock(HttpAsdcResponse.class);
+ HttpSdcResponse responseMock = mock(HttpSdcResponse.class);
HttpEntity messageMock = mock(HttpEntity.class);
- Pair<HttpAsdcResponse, CloseableHttpResponse> responsePair = new Pair<>(responseMock, null);
+ Pair<HttpSdcResponse, CloseableHttpResponse> responsePair = new Pair<>(responseMock, null);
when(responseMock.getStatus()).thenReturn(HttpStatus.SC_OK);
when(responseMock.getMessage()).thenReturn(messageMock);
when(messageMock.getContent()).thenReturn(new ByteArrayInputStream(BYTES));
doReturn(responsePair).when(httpClient).getRequest(eq(ARTIFACT_URL), any(), eq(false));
- IDistributionClientResult result = asdcClient.downloadArtifact(artifactInfo);
+ IDistributionClientResult result = sdcClient.downloadArtifact(artifactInfo);
assertEquals(DistributionActionResultEnum.DATA_INTEGRITY_PROBLEM, result.getDistributionActionResult());
}
@Test
- void downloadArtifactExceptionDuringDownloadHandlingTest() throws IOException {
+ public void downloadArtifactExceptionDuringDownloadHandlingTest() throws IOException {
IArtifactInfo artifactInfo = mock(IArtifactInfo.class);
when(artifactInfo.getArtifactURL()).thenReturn(ARTIFACT_URL);
- HttpAsdcResponse responseMock = mock(HttpAsdcResponse.class);
+ HttpSdcResponse responseMock = mock(HttpSdcResponse.class);
HttpEntity messageMock = mock(HttpEntity.class);
- Pair<HttpAsdcResponse, CloseableHttpResponse> responsePair = new Pair<>(responseMock, null);
+ Pair<HttpSdcResponse, CloseableHttpResponse> responsePair = new Pair<>(responseMock, null);
when(responseMock.getStatus()).thenReturn(HttpStatus.SC_OK);
when(responseMock.getMessage()).thenReturn(messageMock);
when(messageMock.getContent()).thenReturn(new ThrowingInputStreamForTesting());
doReturn(responsePair).when(httpClient).getRequest(eq(ARTIFACT_URL), any(), eq(false));
- IDistributionClientResult result = asdcClient.downloadArtifact(artifactInfo);
+ IDistributionClientResult result = sdcClient.downloadArtifact(artifactInfo);
assertEquals(DistributionActionResultEnum.GENERAL_ERROR, result.getDistributionActionResult());
}
@Test
- void downloadArtifactHandleDownloadErrorTest() throws IOException {
+ public void downloadArtifactHandleDownloadErrorTest() throws IOException {
IArtifactInfo artifactInfo = mock(IArtifactInfo.class);
when(artifactInfo.getArtifactURL()).thenReturn(ARTIFACT_URL);
- HttpAsdcResponse responseMock = mock(HttpAsdcResponse.class);
+ HttpSdcResponse responseMock = mock(HttpSdcResponse.class);
HttpEntity messageMock = mock(HttpEntity.class);
- Pair<HttpAsdcResponse, CloseableHttpResponse> responsePair = new Pair<>(responseMock, null);
+ Pair<HttpSdcResponse, CloseableHttpResponse> responsePair = new Pair<>(responseMock, null);
when(responseMock.getStatus()).thenReturn(HttpStatus.SC_INTERNAL_SERVER_ERROR);
when(responseMock.getMessage()).thenReturn(messageMock);
when(messageMock.getContent()).thenReturn(new ThrowingInputStreamForTesting());
doReturn(responsePair).when(httpClient).getRequest(eq(ARTIFACT_URL), any(), eq(false));
- IDistributionClientResult result = asdcClient.downloadArtifact(artifactInfo);
- assertEquals(DistributionActionResultEnum.ASDC_SERVER_PROBLEM, result.getDistributionActionResult());
- }
-
- private String excpectedStringBody(boolean isConsumeProduceStatusTopic) {
- String stringBodyTemplate =
- "{\r\n" + " \"apiPublicKey\": \"MockApikey\",\r\n" + " \"distrEnvName\": \"MockEnv\",\r\n"
- + " \"isConsumerToSdcDistrStatusTopic\": %s\r\n" + "}";
- return String.format(stringBodyTemplate, isConsumeProduceStatusTopic);
-
+ IDistributionClientResult result = sdcClient.downloadArtifact(artifactInfo);
+ assertEquals(DistributionActionResultEnum.SDC_SERVER_PROBLEM, result.getDistributionActionResult());
}
static class ThrowingInputStreamForTesting extends InputStream {
@@ -376,4 +301,4 @@ class SdcConnectorClientTest {
throw new IOException("Not implemented. This is expected as the implementation is for unit tests only.");
}
}
-}
+} \ No newline at end of file
diff --git a/sdc-distribution-client/src/test/java/org/onap/sdc/impl/DistributionClientTest.java b/sdc-distribution-client/src/test/java/org/onap/sdc/impl/DistributionClientTest.java
index 0297918..7354c6c 100644
--- a/sdc-distribution-client/src/test/java/org/onap/sdc/impl/DistributionClientTest.java
+++ b/sdc-distribution-client/src/test/java/org/onap/sdc/impl/DistributionClientTest.java
@@ -22,29 +22,29 @@
package org.onap.sdc.impl;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
-import com.att.nsa.apiClient.credentials.ApiCredential;
-import com.att.nsa.apiClient.http.HttpException;
-import com.att.nsa.cambria.client.CambriaClient.CambriaApiException;
-import com.att.nsa.cambria.client.CambriaIdentityManager;
import fj.data.Either;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junitpioneer.jupiter.SetEnvironmentVariable;
import org.mockito.Mockito;
import org.onap.sdc.api.IDistributionClient;
import org.onap.sdc.api.consumer.IConfiguration;
import org.onap.sdc.api.notification.IArtifactInfo;
import org.onap.sdc.api.notification.IVfModuleMetadata;
import org.onap.sdc.api.results.IDistributionClientResult;
-import org.onap.sdc.http.HttpAsdcClient;
+import org.onap.sdc.http.HttpSdcClient;
import org.onap.sdc.http.SdcConnectorClient;
-import org.onap.sdc.http.TopicRegistrationResponse;
import org.onap.sdc.utils.ArtifactTypeEnum;
import org.onap.sdc.utils.ArtifactsUtils;
import org.onap.sdc.utils.DistributionActionResultEnum;
@@ -52,13 +52,14 @@ import org.onap.sdc.utils.Pair;
import org.onap.sdc.utils.TestConfiguration;
import org.onap.sdc.utils.TestNotificationCallback;
import org.onap.sdc.utils.Wrapper;
+import org.onap.sdc.utils.kafka.KafkaDataResponse;
+import org.onap.sdc.utils.kafka.SdcKafkaConsumer;
class DistributionClientTest {
-
- static CambriaIdentityManager cc;
DistributionClientImpl client = Mockito.spy(new DistributionClientImpl());
IConfiguration testConfiguration = new TestConfiguration();
SdcConnectorClient connector = Mockito.mock(SdcConnectorClient.class);
+ SdcKafkaConsumer consumer = mock(SdcKafkaConsumer.class);
@AfterEach
@@ -68,8 +69,8 @@ class DistributionClientTest {
@Test
void validateConfigurationTest() {
- final Pair<DistributionActionResultEnum, Configuration> distributionActionResultEnumConfigurationPair = client.validateAndInitConfiguration(
- new Wrapper<IDistributionClientResult>(), testConfiguration);
+ final Pair<DistributionActionResultEnum, Configuration> distributionActionResultEnumConfigurationPair = client.validateAndInitConfiguration(
+ new Wrapper<>(), testConfiguration);
DistributionActionResultEnum validationResult = distributionActionResultEnumConfigurationPair.getFirst();
Configuration configuration = distributionActionResultEnumConfigurationPair.getSecond();
assertEquals(DistributionActionResultEnum.SUCCESS, validationResult);
@@ -82,7 +83,7 @@ class DistributionClientTest {
TestConfiguration userConfig = new TestConfiguration();
userConfig.setPollingInterval(1);
userConfig.setPollingTimeout(2);
- final Pair<DistributionActionResultEnum, Configuration> distributionActionResultEnumConfigurationPair = client.validateAndInitConfiguration(
+ final Pair<DistributionActionResultEnum, Configuration> distributionActionResultEnumConfigurationPair = client.validateAndInitConfiguration(
new Wrapper<>(), userConfig);
DistributionActionResultEnum validationResult = distributionActionResultEnumConfigurationPair.getFirst();
Configuration configuration = distributionActionResultEnumConfigurationPair.getSecond();
@@ -120,23 +121,8 @@ class DistributionClientTest {
}
@Test
- void initWithMocksBadConfigurationTest() throws HttpException, CambriaApiException, IOException {
-
- TopicRegistrationResponse topics = new TopicRegistrationResponse();
- topics.setDistrNotificationTopicName("notificationTopic");
- topics.setDistrStatusTopicName("statusTopic");
- Either<TopicRegistrationResponse, DistributionClientResultImpl> topicsResult = Either.left(topics);
- Mockito.when(connector.registerAsdcTopics(Mockito.any(ApiCredential.class))).thenReturn(topicsResult);
-
- reconfigureAsdcConnector(connector, client);
-
- // cambriaMock
-
- CambriaIdentityManager cambriaMock = Mockito.mock(CambriaIdentityManager.class);
- Mockito.when(cambriaMock.createApiKey(Mockito.any(String.class), Mockito.any(String.class)))
- .thenReturn(new ApiCredential("public", "secret"));
- client.cambriaIdentityManager = cambriaMock;
-
+ public void initWithMocksBadConfigurationTest() {
+ reconfigureSdcConnector(connector, client);
// no password
TestConfiguration testPassword = new TestConfiguration();
testPassword.setPassword(null);
@@ -157,19 +143,19 @@ class DistributionClientTest {
validationResult = client.init(testUser, new TestNotificationCallback());
assertEquals(DistributionActionResultEnum.CONF_MISSING_USERNAME, validationResult.getDistributionActionResult());
- // no ASDC server fqdn
+ // no SDC server fqdn
TestConfiguration testServerFqdn = new TestConfiguration();
- testServerFqdn.setAsdcAddress(null);
+ testServerFqdn.setSdcAddress(null);
validationResult = client.init(testServerFqdn, new TestNotificationCallback());
- assertEquals(DistributionActionResultEnum.CONF_MISSING_ASDC_FQDN, validationResult.getDistributionActionResult());
+ assertEquals(DistributionActionResultEnum.CONF_MISSING_SDC_FQDN, validationResult.getDistributionActionResult());
- testServerFqdn.setAsdcAddress("");
+ testServerFqdn.setSdcAddress("");
validationResult = client.init(testServerFqdn, new TestNotificationCallback());
- assertEquals(DistributionActionResultEnum.CONF_MISSING_ASDC_FQDN, validationResult.getDistributionActionResult());
+ assertEquals(DistributionActionResultEnum.CONF_MISSING_SDC_FQDN, validationResult.getDistributionActionResult());
- testServerFqdn.setAsdcAddress("this##is##bad##fqdn");
+ testServerFqdn.setSdcAddress("this##is##bad##fqdn");
validationResult = client.init(testServerFqdn, new TestNotificationCallback());
- assertEquals(DistributionActionResultEnum.CONF_INVALID_ASDC_FQDN, validationResult.getDistributionActionResult());
+ assertEquals(DistributionActionResultEnum.CONF_INVALID_SDC_FQDN, validationResult.getDistributionActionResult());
// no consumerId
TestConfiguration testConsumerId = new TestConfiguration();
@@ -191,93 +177,63 @@ class DistributionClientTest {
validationResult = client.init(testEnv, new TestNotificationCallback());
assertEquals(DistributionActionResultEnum.CONF_MISSING_ENVIRONMENT_NAME, validationResult.getDistributionActionResult());
- Mockito.verify(client, Mockito.times(0)).getUEBServerList();
- Mockito.verify(cambriaMock, Mockito.times(0)).createApiKey(Mockito.anyString(), Mockito.anyString());
- Mockito.verify(connector, Mockito.times(0)).registerAsdcTopics(Mockito.any(ApiCredential.class));
}
- private void reconfigureAsdcConnector(SdcConnectorClient connector, DistributionClientImpl client) {
- doReturn(connector).when(client).createAsdcConnector(any());
+ private void reconfigureSdcConnector(SdcConnectorClient connector, DistributionClientImpl client) {
+ doReturn(connector).when(client).createSdcConnector(any());
}
@Test
- void initFailedConnectAsdcTest() throws HttpException, CambriaApiException, IOException {
- // cambriaMock
-
- CambriaIdentityManager cambriaMock = Mockito.mock(CambriaIdentityManager.class);
- Mockito.when(cambriaMock.createApiKey(Mockito.any(String.class), Mockito.any(String.class)))
- .thenReturn(new ApiCredential("public", "secret"));
- client.cambriaIdentityManager = cambriaMock;
+ public void initFailedConnectSdcTest() {
- TestConfiguration badAsdcConfig = new TestConfiguration();
- if (badAsdcConfig.isUseHttpsWithSDC() == null) {
+ TestConfiguration badSdcConfig = new TestConfiguration();
+ if (badSdcConfig.isUseHttpsWithSDC() == null) {
System.out.println("null for HTTPS then TRUE");
} else {
- System.out.println("isUseHttpsWithSDC set to " + badAsdcConfig.isUseHttpsWithSDC());
+ System.out.println("isUseHttpsWithSDC set to " + badSdcConfig.isUseHttpsWithSDC());
}
- badAsdcConfig.setAsdcAddress("badhost:8080");
+ badSdcConfig.setSdcAddress("badhost:8080");
- IDistributionClientResult init = client.init(badAsdcConfig, new TestNotificationCallback());
- assertEquals(DistributionActionResultEnum.ASDC_CONNECTION_FAILED, init.getDistributionActionResult());
+ IDistributionClientResult init = client.init(badSdcConfig, new TestNotificationCallback());
+ assertEquals(DistributionActionResultEnum.SDC_CONNECTION_FAILED, init.getDistributionActionResult());
- badAsdcConfig = new TestConfiguration();
- badAsdcConfig.setAsdcAddress("localhost:8181");
+ badSdcConfig = new TestConfiguration();
+ badSdcConfig.setSdcAddress("localhost:8181");
- init = client.init(badAsdcConfig, new TestNotificationCallback());
- assertEquals(DistributionActionResultEnum.ASDC_CONNECTION_FAILED, init.getDistributionActionResult());
+ init = client.init(badSdcConfig, new TestNotificationCallback());
+ assertEquals(DistributionActionResultEnum.SDC_CONNECTION_FAILED, init.getDistributionActionResult());
}
@Test
- void initFailedConnectAsdcInHttpTest() throws HttpException, CambriaApiException, IOException {
- // cambriaMock
+ public void initFailedConnectSdcInHttpTest() {
- CambriaIdentityManager cambriaMock = Mockito.mock(CambriaIdentityManager.class);
- Mockito.when(cambriaMock.createApiKey(Mockito.any(String.class), Mockito.any(String.class)))
- .thenReturn(new ApiCredential("public", "secret"));
- client.cambriaIdentityManager = cambriaMock;
+ TestConfiguration badSdcConfig = new TestConfiguration();
+ badSdcConfig.setSdcAddress("badhost:8080");
+ badSdcConfig.setUseHttpsWithSDC(false);
- TestConfiguration badAsdcConfig = new TestConfiguration();
- badAsdcConfig.setAsdcAddress("badhost:8080");
- badAsdcConfig.setUseHttpsWithSDC(false);
+ IDistributionClientResult init = client.init(badSdcConfig, new TestNotificationCallback());
+ assertEquals(DistributionActionResultEnum.SDC_CONNECTION_FAILED, init.getDistributionActionResult());
- IDistributionClientResult init = client.init(badAsdcConfig, new TestNotificationCallback());
- assertEquals(DistributionActionResultEnum.ASDC_CONNECTION_FAILED, init.getDistributionActionResult());
+ badSdcConfig = new TestConfiguration();
+ badSdcConfig.setSdcAddress("localhost:8181");
+ badSdcConfig.setUseHttpsWithSDC(false);
- badAsdcConfig = new TestConfiguration();
- badAsdcConfig.setAsdcAddress("localhost:8181");
- badAsdcConfig.setUseHttpsWithSDC(false);
-
- init = client.init(badAsdcConfig, new TestNotificationCallback());
- assertEquals(DistributionActionResultEnum.ASDC_CONNECTION_FAILED, init.getDistributionActionResult());
+ init = client.init(badSdcConfig, new TestNotificationCallback());
+ assertEquals(DistributionActionResultEnum.SDC_CONNECTION_FAILED, init.getDistributionActionResult());
}
@Test
- void getConfigurationTest() throws HttpException, CambriaApiException, IOException {
+ public void getConfigurationTest() {
// connectorMock
mockArtifactTypeList();
- TopicRegistrationResponse topics = new TopicRegistrationResponse();
- topics.setDistrNotificationTopicName("notificationTopic");
- topics.setDistrStatusTopicName("statusTopic");
- Either<TopicRegistrationResponse, DistributionClientResultImpl> topicsResult = Either.left(topics);
- Mockito.when(connector.registerAsdcTopics(Mockito.any(ApiCredential.class))).thenReturn(topicsResult);
- IDistributionClientResult success = initSuccesResult();
- Mockito.when(connector.unregisterTopics(Mockito.any(ApiCredential.class))).thenReturn(success);
-
- reconfigureAsdcConnector(connector, client);
+ mockKafkaData();
+ reconfigureSdcConnector(connector, client);
+ TestConfiguration badSdcConfig = new TestConfiguration();
+ badSdcConfig.setPollingInterval(-5);
- // cambriaMock
-
- CambriaIdentityManager cambriaMock = Mockito.mock(CambriaIdentityManager.class);
- Mockito.when(cambriaMock.createApiKey(Mockito.any(String.class), Mockito.any(String.class)))
- .thenReturn(new ApiCredential("public", "secret"));
- client.cambriaIdentityManager = cambriaMock;
-
- TestConfiguration badAsdcConfig = new TestConfiguration();
- badAsdcConfig.setPollingInterval(-5);
-
- IDistributionClientResult init = client.init(badAsdcConfig, new TestNotificationCallback());
+ IDistributionClientResult init = client.init(badSdcConfig, new TestNotificationCallback());
assertEquals(DistributionActionResultEnum.SUCCESS, init.getDistributionActionResult());
String confString = client.getConfiguration().toString();
@@ -301,32 +257,12 @@ class DistributionClientTest {
}
@Test
- void initWithMocksTest() throws HttpException, CambriaApiException, IOException {
-
+ public void initWithMocksTest() {
mockArtifactTypeList();
-
- TopicRegistrationResponse topics = new TopicRegistrationResponse();
- topics.setDistrNotificationTopicName("notificationTopic");
- topics.setDistrStatusTopicName("statusTopic");
- Either<TopicRegistrationResponse, DistributionClientResultImpl> topicsResult = Either.left(topics);
- Mockito.when(connector.registerAsdcTopics(Mockito.any(ApiCredential.class))).thenReturn(topicsResult);
- IDistributionClientResult success = initSuccesResult();
- Mockito.when(connector.unregisterTopics(Mockito.any(ApiCredential.class))).thenReturn(success);
-
- reconfigureAsdcConnector(connector, client);
-
- // cambriaMock
-
- CambriaIdentityManager cambriaMock = Mockito.mock(CambriaIdentityManager.class);
- Mockito.when(cambriaMock.createApiKey(Mockito.any(String.class), Mockito.any(String.class)))
- .thenReturn(new ApiCredential("public", "secret"));
- client.cambriaIdentityManager = cambriaMock;
-
+ mockKafkaData();
+ reconfigureSdcConnector(connector, client);
IDistributionClientResult initResponse = client.init(testConfiguration, new TestNotificationCallback());
assertEquals(DistributionActionResultEnum.SUCCESS, initResponse.getDistributionActionResult());
- Mockito.verify(client, Mockito.times(1)).getUEBServerList();
- Mockito.verify(cambriaMock, Mockito.times(1)).createApiKey(Mockito.anyString(), Mockito.anyString());
- Mockito.verify(connector, Mockito.times(1)).registerAsdcTopics(Mockito.any(ApiCredential.class));
System.out.println(initResponse);
}
@@ -340,137 +276,31 @@ class DistributionClientTest {
Mockito.when(connector.getValidArtifactTypesList()).thenReturn(eitherArtifactTypes);
}
- @Test
- void testAlreadyInitTest() throws HttpException, CambriaApiException, IOException {
- initWithMocksTest();
- IDistributionClientResult initResponse = client.init(testConfiguration, new TestNotificationCallback());
- assertEquals(DistributionActionResultEnum.DISTRIBUTION_CLIENT_ALREADY_INITIALIZED, initResponse.getDistributionActionResult());
+ private void mockKafkaData() {
+ KafkaDataResponse kafkaData = new KafkaDataResponse();
+ kafkaData.setKafkaBootStrapServer("localhost:9092");
+ kafkaData.setDistrNotificationTopicName("SDC-DISTR-NOTIF-TOPIC-AUTO");
+ kafkaData.setDistrStatusTopicName("SDC-DISTR-STATUS-TOPIC-AUTO");
+ final Either<KafkaDataResponse, IDistributionClientResult> eitherArtifactTypes = Either.left(kafkaData);
+ Mockito.when(connector.getKafkaDistData()).thenReturn(eitherArtifactTypes);
}
@Test
- void initGetServerFailedTest() throws HttpException, CambriaApiException, IOException {
-
- // connectorMock
- IDistributionClientResult getServersResult = new DistributionClientResultImpl(DistributionActionResultEnum.ASDC_SERVER_PROBLEM, "problem");
- Either<List<String>, IDistributionClientResult> serversResult = Either.right(getServersResult);
- doReturn(serversResult).when(client).getUEBServerList();
-
- TopicRegistrationResponse topics = new TopicRegistrationResponse();
- topics.setDistrNotificationTopicName("notificationTopic");
- topics.setDistrStatusTopicName("statusTopic");
- Either<TopicRegistrationResponse, DistributionClientResultImpl> topicsResult = Either.left(topics);
- Mockito.when(connector.registerAsdcTopics(Mockito.any(ApiCredential.class))).thenReturn(topicsResult);
-
- reconfigureAsdcConnector(connector, client);
-
- // cambriaMock
-
- CambriaIdentityManager cambriaMock = Mockito.mock(CambriaIdentityManager.class);
- Mockito.when(cambriaMock.createApiKey(Mockito.any(String.class), Mockito.any(String.class)))
- .thenReturn(new ApiCredential("public", "secret"));
- client.cambriaIdentityManager = cambriaMock;
-
+ void testAlreadyInitTest() {
+ initWithMocksTest();
IDistributionClientResult initResponse = client.init(testConfiguration, new TestNotificationCallback());
- assertEquals(DistributionActionResultEnum.ASDC_SERVER_PROBLEM, initResponse.getDistributionActionResult());
-
- Mockito.verify(client, Mockito.times(1)).getUEBServerList();
- Mockito.verify(cambriaMock, Mockito.times(0)).createApiKey(Mockito.anyString(), Mockito.anyString());
- Mockito.verify(connector, Mockito.times(0)).registerAsdcTopics(Mockito.any(ApiCredential.class));
-
- System.out.println(initResponse);
- }
-
- @Test
- void initCreateKeysFailedTest() throws HttpException, CambriaApiException, IOException {
-
- // connectorMock
- mockArtifactTypeList();
-
- TopicRegistrationResponse topics = new TopicRegistrationResponse();
- topics.setDistrNotificationTopicName("notificationTopic");
- topics.setDistrStatusTopicName("statusTopic");
- Either<TopicRegistrationResponse, DistributionClientResultImpl> topicsResult = Either.left(topics);
- Mockito.when(connector.registerAsdcTopics(Mockito.any(ApiCredential.class))).thenReturn(topicsResult);
-
- reconfigureAsdcConnector(connector, client);
-
- // cambriaMock
-
- CambriaIdentityManager cambriaMock = Mockito.mock(CambriaIdentityManager.class);
- Mockito.when(cambriaMock.createApiKey(Mockito.any(String.class), Mockito.any(String.class))).thenThrow(new CambriaApiException("failure"));
- client.cambriaIdentityManager = cambriaMock;
-
- IDistributionClientResult initResponse = client.init(testConfiguration, new TestNotificationCallback());
- assertEquals(DistributionActionResultEnum.UEB_KEYS_CREATION_FAILED, initResponse.getDistributionActionResult());
-
- Mockito.verify(client, Mockito.times(1)).getUEBServerList();
- Mockito.verify(cambriaMock, Mockito.times(1)).createApiKey(Mockito.anyString(), Mockito.anyString());
- Mockito.verify(connector, Mockito.times(0)).registerAsdcTopics(Mockito.any(ApiCredential.class));
- System.out.println(initResponse);
- }
-
- @Test
- void initRegistrationFailedTest() throws HttpException, CambriaApiException, IOException {
-
- // connectorMock
- mockArtifactTypeList();
- DistributionClientResultImpl failureResult = new DistributionClientResultImpl(DistributionActionResultEnum.BAD_REQUEST, "Bad Request");
- Either<TopicRegistrationResponse, DistributionClientResultImpl> topicsResult = Either.right(failureResult);
- Mockito.when(connector.registerAsdcTopics(Mockito.any(ApiCredential.class))).thenReturn(topicsResult);
-
- reconfigureAsdcConnector(connector, client);
-
- // cambriaMock
-
- CambriaIdentityManager cambriaMock = Mockito.mock(CambriaIdentityManager.class);
- Mockito.when(cambriaMock.createApiKey(Mockito.any(String.class), Mockito.any(String.class)))
- .thenReturn(new ApiCredential("public", "secret"));
- client.cambriaIdentityManager = cambriaMock;
-
- IDistributionClientResult initResponse = client.init(testConfiguration, new TestNotificationCallback());
- assertEquals(DistributionActionResultEnum.BAD_REQUEST, initResponse.getDistributionActionResult());
- Mockito.verify(client, Mockito.times(1)).getUEBServerList();
- Mockito.verify(cambriaMock, Mockito.times(1)).createApiKey(Mockito.anyString(), Mockito.anyString());
- Mockito.verify(connector, Mockito.times(1)).registerAsdcTopics(Mockito.any(ApiCredential.class));
- System.out.println(initResponse);
+ assertEquals(DistributionActionResultEnum.DISTRIBUTION_CLIENT_ALREADY_INITIALIZED, initResponse.getDistributionActionResult());
}
@Test
void testStartWithoutInit() {
- IDistributionClientResult result = client.start();
- assertEquals(DistributionActionResultEnum.DISTRIBUTION_CLIENT_NOT_INITIALIZED, result.getDistributionActionResult());
- }
-
- private IArtifactInfo initArtifactInfo() {
- ArtifactInfoImpl artifactInfo = new ArtifactInfoImpl();
- artifactInfo.setArtifactURL("/sdc/v1/services/serviceName/0.1/artifacts/aaa.hh");
- artifactInfo.setArtifactChecksum(ArtifactsUtils.getValidChecksum());
- return artifactInfo;
- }
-
- // ########### TESTS TO ADD TO CI START ###########
- /*public void createKeysTestCI() throws MalformedURLException, GeneralSecurityException {
- validateConfigurationTest();
- CambriaIdentityManager trueCambria = new CambriaClientBuilders.IdentityManagerBuilder().usingHttps().usingHosts(serverList).build();
- client.cambriaIdentityManager = trueCambria;
- DistributionClientResultImpl keysResult = client.createUebKeys();
- assertEquals(DistributionActionResultEnum.SUCCESS, keysResult.getDistributionActionResult());
- assertFalse(client.credential.getApiKey().isEmpty());
- assertFalse(client.credential.getApiSecret().isEmpty());
-
- System.out.println(keysResult);
- System.out.println("keys: public=" + client.credential.getApiKey() + " | secret=" + client.credential.getApiSecret());
- }
-*/
- public void initTestCI() {
- IDistributionClient distributionClient = DistributionClientFactory.createDistributionClient();
- IDistributionClientResult init = distributionClient.init(testConfiguration, new TestNotificationCallback());
- assertEquals(DistributionActionResultEnum.SUCCESS, init.getDistributionActionResult());
-
+ IDistributionClientResult result = client.start();
+ assertSame(DistributionActionResultEnum.DISTRIBUTION_CLIENT_NOT_INITIALIZED,
+ result.getDistributionActionResult());
}
@Test
- void testDecodeVfModuleArtifact() throws IOException {
+ void testDecodeVfModuleArtifact() {
String vfModuleContent = getVFModuleExample();
List<IVfModuleMetadata> decodeVfModuleArtifact = client.decodeVfModuleArtifact(vfModuleContent.getBytes());
assertEquals(1, decodeVfModuleArtifact.size());
@@ -504,23 +334,4 @@ class DistributionClientTest {
" }\r\n" +
"]";
}
-
-
- public void connectorRegisterCI() {
- SdcConnectorClient connector = new SdcConnectorClient(testConfiguration, new HttpAsdcClient(testConfiguration));
-
- ApiCredential creds = new ApiCredential("publicKey", "secretKey");
- Either<TopicRegistrationResponse, DistributionClientResultImpl> topicsFromAsdc = connector.registerAsdcTopics(creds);
- assertTrue(topicsFromAsdc.isLeft());
-
- }
-
- public void downloadArtifactTestCI() {
- SdcConnectorClient connector = new SdcConnectorClient(testConfiguration, new HttpAsdcClient(testConfiguration));
- IArtifactInfo artifactInfo = initArtifactInfo();
- connector.downloadArtifact(artifactInfo);
-
- }
- // ########### TESTS TO ADD TO CI END ###########
-
}
diff --git a/sdc-distribution-client/src/test/java/org/onap/sdc/impl/NotificationConsumerTest.java b/sdc-distribution-client/src/test/java/org/onap/sdc/impl/NotificationConsumerTest.java
index 1e3db81..a70a537 100644
--- a/sdc-distribution-client/src/test/java/org/onap/sdc/impl/NotificationConsumerTest.java
+++ b/sdc-distribution-client/src/test/java/org/onap/sdc/impl/NotificationConsumerTest.java
@@ -21,30 +21,20 @@
package org.onap.sdc.impl;
-import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
-import com.att.nsa.cambria.client.CambriaConsumer;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
-import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import org.awaitility.Durations;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
@@ -56,359 +46,297 @@ import org.onap.sdc.utils.ArtifactTypeEnum;
import org.onap.sdc.utils.DistributionActionResultEnum;
import org.onap.sdc.utils.DistributionClientConstants;
import org.onap.sdc.utils.TestConfiguration;
+import org.onap.sdc.utils.kafka.SdcKafkaConsumer;
class NotificationConsumerTest {
-
- final static IDistributionClientResult DISTRIBUTION_SUCCESS_RESULT = buildSuccessResult();
- private final CambriaConsumer cambriaConsumer = mock(CambriaConsumer.class);
- private final INotificationCallback clientCallback = spy(INotificationCallback.class);
- private final Queue<Iterable<String>> notificationsQueue = new LinkedList<>();
- private final DistributionClientImpl distributionClient = Mockito.spy(DistributionClientImpl.class);
- private List<String> artifactsTypes = List.of(ArtifactTypeEnum.HEAT.name());
- private final List<Boolean> notificationStatusResults = new ArrayList<>();
-
- private static IDistributionClientResult buildSuccessResult() {
- return new IDistributionClientResult() {
-
- @Override
- public String getDistributionMessageResult() {
- return "";
- }
-
- @Override
- public DistributionActionResultEnum getDistributionActionResult() {
- return DistributionActionResultEnum.SUCCESS;
- }
- };
- }
-
- private NotificationConsumer createNotificationConsumer() {
- return new NotificationConsumer(cambriaConsumer, clientCallback, artifactsTypes, distributionClient);
- }
-
- @BeforeEach
- public void beforeTest() throws IOException {
- Mockito.reset(clientCallback, distributionClient);
- when(cambriaConsumer.fetch()).then((Answer<Iterable<String>>) invocation -> {
- if (!notificationsQueue.isEmpty()) {
- return notificationsQueue.remove();
- } else {
- return new ArrayList<>();
- }
- });
- when(distributionClient.sendNotificationStatus(anyLong(), anyString(), any(ArtifactInfoImpl.class), anyBoolean()))
- .then((Answer<IDistributionClientResult>) invocation -> {
- boolean isNotified = (boolean) invocation.getArguments()[3];
- notificationStatusResults.add(isNotified);
- return DISTRIBUTION_SUCCESS_RESULT;
- });
-
- }
-
- @Test
- void testNoNotificationsSent() {
- ScheduledExecutorService executorPool = Executors.newScheduledThreadPool(DistributionClientConstants.POOL_SIZE);
- ScheduledFuture<?> scheduledFuture = executorPool.scheduleAtFixedRate(createNotificationConsumer(), 0, 100, TimeUnit.MILLISECONDS);
- await().atMost(Durations.ONE_SECOND).until(() -> !scheduledFuture.isDone());
- executorPool.shutdown();
-
- Mockito.verify(clientCallback, Mockito.times(0)).activateCallback(any(INotificationData.class));
- }
-
- @Test
- void testNonRelevantNotificationSent() throws InterruptedException {
-
- simulateNotificationFromUEB(getAsdcServiceNotificationWithoutHeatArtifact());
- Mockito.verify(clientCallback, Mockito.times(0)).activateCallback(any(INotificationData.class));
- }
-
- @Test
- void testRelevantNotificationSent() throws InterruptedException {
- simulateNotificationFromUEB(getAsdcServiceNotificationWithHeatArtifact());
- Mockito.verify(clientCallback, Mockito.times(1)).activateCallback(any(INotificationData.class));
- }
-
- @Test
- void testNonExistingArtifactsNotificationSent() throws InterruptedException {
- simulateNotificationFromUEB(getAsdcNotificationWithNonExistentArtifact());
- Mockito.verify(clientCallback, Mockito.times(1)).activateCallback(any(INotificationData.class));
- }
-
- @Test
- void testNotificationStatusSent() throws InterruptedException {
- simulateNotificationFromUEB(getAsdcServiceNotificationWithHeatArtifact());
-
- Mockito.verify(distributionClient, Mockito.times(3))
- .sendNotificationStatus(anyLong(), anyString(), any(ArtifactInfoImpl.class), anyBoolean());
- assertEquals(1, countInstances(notificationStatusResults, Boolean.TRUE));
- assertEquals(2, countInstances(notificationStatusResults, Boolean.FALSE));
- }
-
- @Test
- void testNotificationRelatedArtifacts() throws InterruptedException {
- List<String> artifactTypesTmp = new ArrayList<>();
- for (ArtifactTypeEnum artifactTypeEnum : ArtifactTypeEnum.values()) {
- artifactTypesTmp.add(artifactTypeEnum.name());
- }
- artifactsTypes = artifactTypesTmp;
- simulateNotificationFromUEB(getAsdcServiceNotificationWithRelatedArtifacts());
-
- Mockito.verify(distributionClient, Mockito.times(3))
- .sendNotificationStatus(anyLong(), anyString(), any(ArtifactInfoImpl.class), anyBoolean());
- assertEquals(3, countInstances(notificationStatusResults, Boolean.TRUE));
- assertEquals(0, countInstances(notificationStatusResults, Boolean.FALSE));
- }
-
- @Test
- void testNotificationStatusWithServiceArtifatcs() throws InterruptedException {
- simulateNotificationFromUEB(getNotificationWithServiceArtifatcs());
- Mockito.verify(distributionClient, Mockito.times(6))
- .sendNotificationStatus(anyLong(), anyString(), any(ArtifactInfoImpl.class), anyBoolean());
- assertEquals(2, countInstances(notificationStatusResults, Boolean.TRUE));
- assertEquals(4, countInstances(notificationStatusResults, Boolean.FALSE));
-
- }
-
- @Test
- final void testBuildCallbackNotificationLogicFlagIsFalse() {
- NotificationConsumer consumer = createNotificationConsumer();
- Gson gson = new GsonBuilder().setPrettyPrinting().create();
- TestConfiguration testConfiguration = new TestConfiguration();
- testConfiguration.setFilterInEmptyResources(false);
- when(distributionClient.getConfiguration()).thenReturn(testConfiguration);
- NotificationDataImpl notification = gson.fromJson(getNotificationWithMultipleResources(), NotificationDataImpl.class);
- NotificationDataImpl notificationBuiltInClient = consumer.buildCallbackNotificationLogic(0, notification);
- assertEquals(1, notificationBuiltInClient.getResources().size());
- }
-
- @Test
- final void testBuildCallbackNotificationLogicFlagIsTrue() {
- NotificationConsumer consumer = createNotificationConsumer();
- Gson gson = new GsonBuilder().setPrettyPrinting().create();
- TestConfiguration testConfiguration = new TestConfiguration();
- testConfiguration.setFilterInEmptyResources(true);
- when(distributionClient.getConfiguration()).thenReturn(testConfiguration);
- NotificationDataImpl notification = gson.fromJson(getNotificationWithMultipleResources(), NotificationDataImpl.class);
- NotificationDataImpl notificationBuiltInClient = consumer.buildCallbackNotificationLogic(0, notification);
- assertEquals(2, notificationBuiltInClient.getResources().size());
- }
-
- private void simulateNotificationFromUEB(final String notificationFromUEB) throws InterruptedException {
- ScheduledExecutorService executorPool = Executors.newScheduledThreadPool(DistributionClientConstants.POOL_SIZE);
- ScheduledFuture<?> scheduledFuture = executorPool.scheduleAtFixedRate(createNotificationConsumer(), 0, 100, TimeUnit.MILLISECONDS);
-
- await().atMost(Durations.TWO_HUNDRED_MILLISECONDS).until(() -> !scheduledFuture.isDone());
-
- List<String> nonHeatNotification = Collections.singletonList(notificationFromUEB);
- notificationsQueue.add(nonHeatNotification);
- Thread.sleep(800);
- executorPool.shutdown();
- }
-
- private String getAsdcServiceNotificationWithHeatArtifact() {
- return "{\"distributionID\" : \"bcc7a72e-90b1-4c5f-9a37-28dc3cd86416\",\r\n" + " \"serviceName\" : \"Testnotificationser1\",\r\n"
- + " \"serviceVersion\" : \"1.0\",\r\n"
- + " \"serviceUUID\" : \"7f7f94f4-373a-4b71-a0e3-80ae2ba4eb5d\",\r\n" + " \"serviceDescription\" : \"TestNotificationVF1\",\r\n"
- + " \"resources\" : [{\r\n" + " \"resourceInstanceName\" : \"testnotificationvf11\",\r\n"
- + " \"resourceName\" : \"TestNotificationVF1\",\r\n" + " \"resourceVersion\" : \"1.0\",\r\n"
- + " \"resoucreType\" : \"VF\",\r\n" + " \"resourceUUID\" : \"907e1746-9f69-40f5-9f2a-313654092a2d\",\r\n"
- + " \"artifacts\" : [{\r\n" + " \"artifactName\" : \"sample-xml-alldata-1-1.xml\",\r\n"
- + " \"artifactType\" : \"YANG_XML\",\r\n"
- + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/sample-xml-alldata-1-1.xml\",\r\n"
- + " \"artifactChecksum\" : \"MTUxODFkMmRlOTNhNjYxMGYyYTI1ZjA5Y2QyNWQyYTk\\u003d\",\r\n"
- + " \"artifactDescription\" : \"MyYang\",\r\n" + " \"artifactTimeout\" : 0,\r\n"
- + " \"artifactUUID\" : \"0005bc4a-2c19-452e-be6d-d574a56be4d0\",\r\n" + " \"artifactVersion\" : \"1\"\r\n"
- + " }, {\r\n" + " \"artifactName\" : \"heat.yaml\",\r\n"
- + " \"artifactType\" : \"HEAT\",\r\n"
- + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.yaml\",\r\n"
- + " \"artifactChecksum\" : \"ODEyNjE4YTMzYzRmMTk2ODVhNTU2NTg3YWEyNmIxMTM\\u003d\",\r\n"
- + " \"artifactDescription\" : \"heat\",\r\n" + " \"artifactTimeout\" : 60,\r\n"
- + " \"artifactUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\",\r\n" + " \"artifactVersion\" : \"1\"\r\n"
- + " }, {\r\n" + " \"artifactName\" : \"heat.env\",\r\n"
- + " \"artifactType\" : \"HEAT_ENV\",\r\n"
- + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.env\",\r\n"
- + " \"artifactChecksum\" : \"NGIzMjExZTM1NDc2NjBjOTQyMGJmMWNiMmU0NTE5NzM\\u003d\",\r\n"
- + " \"artifactDescription\" : \"Auto-generated HEAT Environment deployment artifact\",\r\n"
- + " \"artifactTimeout\" : 0,\r\n" + " \"artifactUUID\" : \"ce65d31c-35c0-43a9-90c7-596fc51d0c86\",\r\n"
- + " \"artifactVersion\" : \"1\",\r\n"
- + " \"generatedFromUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\"\r\n" + " }\r\n" + " ]\r\n"
- + " }\r\n" + " ]}";
- }
-
- private String getNotificationWithMultipleResources() {
- return "{\"distributionID\" : \"bcc7a72e-90b1-4c5f-9a37-28dc3cd86416\",\r\n" +
- " \"serviceName\" : \"Testnotificationser1\",\r\n" +
- " \"serviceVersion\" : \"1.0\",\r\n" +
- " \"serviceUUID\" : \"7f7f94f4-373a-4b71-a0e3-80ae2ba4eb5d\",\r\n" +
- " \"serviceDescription\" : \"TestNotificationVF1\",\r\n" +
- " \"resources\" : [{\r\n" +
- " \"resourceInstanceName\" : \"testnotificationvf11\",\r\n" +
- " \"resourceName\" : \"TestNotificationVF1\",\r\n" +
- " \"resourceVersion\" : \"1.0\",\r\n" +
- " \"resoucreType\" : \"VF\",\r\n" +
- " \"resourceUUID\" : \"907e1746-9f69-40f5-9f2a-313654092a2d\",\r\n" +
- " \"artifacts\" : [{\r\n" +
- " \"artifactName\" : \"sample-xml-alldata-1-1.xml\",\r\n" +
- " \"artifactType\" : \"YANG_XML\",\r\n" +
- " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/sample-xml-alldata-1-1.xml\",\r\n"
- +
- " \"artifactChecksum\" : \"MTUxODFkMmRlOTNhNjYxMGYyYTI1ZjA5Y2QyNWQyYTk\\u003d\",\r\n" +
- " \"artifactDescription\" : \"MyYang\",\r\n" +
- " \"artifactTimeout\" : 0,\r\n" +
- " \"artifactUUID\" : \"0005bc4a-2c19-452e-be6d-d574a56be4d0\",\r\n" +
- " \"artifactVersion\" : \"1\"\r\n" +
- " }" +
- " ]\r\n" +
- " },\r\n" +
- " {\r\n" +
- " \"resourceInstanceName\" : \"testnotificationvf12\",\r\n" +
- " \"resourceName\" : \"TestNotificationVF1\",\r\n" +
- " \"resourceVersion\" : \"1.0\",\r\n" +
- " \"resoucreType\" : \"VF\",\r\n" +
- " \"resourceUUID\" : \"907e1746-9f69-40f5-9f2a-313654092a2e\",\r\n" +
- " \"artifacts\" : [{\r\n" +
- " \"artifactName\" : \"heat.yaml\",\r\n" +
- " \"artifactType\" : \"HEAT\",\r\n" +
- " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.yaml\",\r\n"
- +
- " \"artifactChecksum\" : \"ODEyNjE4YTMzYzRmMTk2ODVhNTU2NTg3YWEyNmIxMTM\\u003d\",\r\n" +
- " \"artifactDescription\" : \"heat\",\r\n" +
- " \"artifactTimeout\" : 60,\r\n" +
- " \"artifactUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\",\r\n" +
- " \"artifactVersion\" : \"1\"\r\n" +
- " }" +
- " ]\r\n" +
- " }\r\n" +
- " ]}";
- }
-
-
- private String getAsdcNotificationWithNonExistentArtifact() {
- return "{\"distributionID\" : \"bcc7a72e-90b1-4c5f-9a37-28dc3cd86416\",\r\n" + " \"serviceName\" : \"Testnotificationser1\",\r\n"
- + " \"serviceVersion\" : \"1.0\",\r\n"
- + " \"serviceUUID\" : \"7f7f94f4-373a-4b71-a0e3-80ae2ba4eb5d\",\r\n" + " \"serviceDescription\" : \"TestNotificationVF1\",\r\n"
- + " \"bugabuga\" : \"xyz\",\r\n" + " \"resources\" : [{\r\n"
- + " \"resourceInstanceName\" : \"testnotificationvf11\",\r\n" + " \"resourceName\" : \"TestNotificationVF1\",\r\n"
- + " \"resourceVersion\" : \"1.0\",\r\n" + " \"resoucreType\" : \"VF\",\r\n"
- + " \"resourceUUID\" : \"907e1746-9f69-40f5-9f2a-313654092a2d\",\r\n" + " \"artifacts\" : [{\r\n"
- + " \"artifactName\" : \"heat.yaml\",\r\n" + " \"artifactType\" : \"HEAT\",\r\n"
- + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.yaml\",\r\n"
- + " \"artifactChecksum\" : \"ODEyNjE4YTMzYzRmMTk2ODVhNTU2NTg3YWEyNmIxMTM\\u003d\",\r\n"
- + " \"artifactDescription\" : \"heat\",\r\n" + " \"artifactTimeout\" : 60,\r\n"
- + " \"artifactUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\",\r\n"
- + " \"artifactBuga\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\",\r\n" + " \"artifactVersion\" : \"1\"\r\n"
- + " }, {\r\n" + " \"artifactName\" : \"buga.bug\",\r\n" + " \"artifactType\" : \"BUGA_BUGA\",\r\n"
- + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.env\",\r\n"
- + " \"artifactChecksum\" : \"NGIzMjExZTM1NDc2NjBjOTQyMGJmMWNiMmU0NTE5NzM\\u003d\",\r\n"
- + " \"artifactDescription\" : \"Auto-generated HEAT Environment deployment artifact\",\r\n"
- + " \"artifactTimeout\" : 0,\r\n" + " \"artifactUUID\" : \"ce65d31c-35c0-43a9-90c7-596fc51d0c86\",\r\n"
- + " \"artifactVersion\" : \"1\",\r\n"
- + " \"generatedFromUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\"\r\n" + " }\r\n" + " ]\r\n"
- + " }\r\n" + " ]}";
- }
-
- private String getAsdcServiceNotificationWithRelatedArtifacts() {
- return "{\"distributionID\" : \"bcc7a72e-90b1-4c5f-9a37-28dc3cd86416\",\r\n" + " \"serviceName\" : \"Testnotificationser1\",\r\n"
- + " \"serviceVersion\" : \"1.0\",\r\n"
- + " \"serviceUUID\" : \"7f7f94f4-373a-4b71-a0e3-80ae2ba4eb5d\",\r\n" + " \"serviceDescription\" : \"TestNotificationVF1\",\r\n"
- + " \"resources\" : [{\r\n" + " \"resourceInstanceName\" : \"testnotificationvf11\",\r\n"
- + " \"resourceName\" : \"TestNotificationVF1\",\r\n" + " \"resourceVersion\" : \"1.0\",\r\n"
- + " \"resoucreType\" : \"VF\",\r\n" + " \"resourceUUID\" : \"907e1746-9f69-40f5-9f2a-313654092a2d\",\r\n"
- + " \"artifacts\" : [{\r\n" + " \"artifactName\" : \"sample-xml-alldata-1-1.xml\",\r\n"
- + " \"artifactType\" : \"YANG_XML\",\r\n"
- + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/sample-xml-alldata-1-1.xml\",\r\n"
- + " \"artifactChecksum\" : \"MTUxODFkMmRlOTNhNjYxMGYyYTI1ZjA5Y2QyNWQyYTk\\u003d\",\r\n"
- + " \"artifactDescription\" : \"MyYang\",\r\n" + " \"artifactTimeout\" : 0,\r\n"
- + " \"artifactUUID\" : \"0005bc4a-2c19-452e-be6d-d574a56be4d0\",\r\n" + " \"artifactVersion\" : \"1\",\r\n"
- + " \"relatedArtifacts\" : [\r\n"
- + " \"ce65d31c-35c0-43a9-90c7-596fc51d0c86\"\r\n" + " ]" + " }, {\r\n"
- + " \"artifactName\" : \"heat.yaml\",\r\n"
- + " \"artifactType\" : \"HEAT\",\r\n"
- + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.yaml\",\r\n"
- + " \"artifactChecksum\" : \"ODEyNjE4YTMzYzRmMTk2ODVhNTU2NTg3YWEyNmIxMTM\\u003d\",\r\n"
- + " \"artifactDescription\" : \"heat\",\r\n" + " \"artifactTimeout\" : 60,\r\n"
- + " \"artifactUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\",\r\n" + " \"artifactVersion\" : \"1\", \r\n"
- + " \"relatedArtifacts\" : [\r\n"
- + " \"0005bc4a-2c19-452e-be6d-d574a56be4d0\", \r\n" + " \"ce65d31c-35c0-43a9-90c7-596fc51d0c86\"\r\n"
- + " ]" + " }, {\r\n"
- + " \"artifactName\" : \"heat.env\",\r\n" + " \"artifactType\" : \"HEAT_ENV\",\r\n"
- + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.env\",\r\n"
- + " \"artifactChecksum\" : \"NGIzMjExZTM1NDc2NjBjOTQyMGJmMWNiMmU0NTE5NzM\\u003d\",\r\n"
- + " \"artifactDescription\" : \"Auto-generated HEAT Environment deployment artifact\",\r\n"
- + " \"artifactTimeout\" : 0,\r\n" + " \"artifactUUID\" : \"ce65d31c-35c0-43a9-90c7-596fc51d0c86\",\r\n"
- + " \"artifactVersion\" : \"1\",\r\n"
- + " \"generatedFromUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\"\r\n" + " }\r\n" + " ]\r\n"
- + " }\r\n" + " ]}";
- }
-
- private String getAsdcServiceNotificationWithoutHeatArtifact() {
- return "{" + " \"distributionID\" : \"5v1234d8-5b6d-42c4-7t54-47v95n58qb7\"," + " \"serviceName\" : \"srv1\","
- + " \"serviceVersion\": \"2.0\"," + " \"serviceUUID\" : \"4e0697d8-5b6d-42c4-8c74-46c33d46624c\","
- + " \"serviceArtifacts\":[" + " {" + " \"artifactName\" : \"ddd.yml\","
- + " \"artifactType\" : \"DG_XML\"," + " \"artifactTimeout\" : \"65\","
- + " \"artifactDescription\" : \"description\"," + " \"artifactURL\" :"
- + " \"/sdc/v1/catalog/services/srv1/2.0/resources/ddd/3.0/artifacts/ddd.xml\" ,"
- + " \"resourceUUID\" : \"4e5874d8-5b6d-42c4-8c74-46c33d90drw\" ,"
- + " \"checksum\" : \"15e389rnrp58hsw==\"" + " }" + " ]" + "}";
- }
-
- private String getNotificationWithServiceArtifatcs() {
- return "{\r\n" + " \"distributionID\" : \"bcc7a72e-90b1-4c5f-9a37-28dc3cd86416\",\r\n" + " \"serviceName\" : \"Testnotificationser1\",\r\n"
- + " \"serviceVersion\" : \"1.0\",\r\n"
- + " \"serviceUUID\" : \"7f7f94f4-373a-4b71-a0e3-80ae2ba4eb5d\",\r\n" + " \"serviceDescription\" : \"TestNotificationVF1\",\r\n"
- + " \"serviceArtifacts\" : [{\r\n" + " \"artifactName\" : \"sample-xml-alldata-1-1.xml\",\r\n"
- + " \"artifactType\" : \"YANG_XML\",\r\n"
- + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/sample-xml-alldata-1-1.xml\",\r\n"
- + " \"artifactChecksum\" : \"MTUxODFkMmRlOTNhNjYxMGYyYTI1ZjA5Y2QyNWQyYTk\\u003d\",\r\n"
- + " \"artifactDescription\" : \"MyYang\",\r\n" + " \"artifactTimeout\" : 0,\r\n"
- + " \"artifactUUID\" : \"0005bc4a-2c19-452e-be6d-d574a56be4d0\",\r\n" + " \"artifactVersion\" : \"1\"\r\n"
- + " }, {\r\n" + " \"artifactName\" : \"heat.yaml\",\r\n"
- + " \"artifactType\" : \"HEAT\",\r\n"
- + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.yaml\",\r\n"
- + " \"artifactChecksum\" : \"ODEyNjE4YTMzYzRmMTk2ODVhNTU2NTg3YWEyNmIxMTM\\u003d\",\r\n"
- + " \"artifactDescription\" : \"heat\",\r\n" + " \"artifactTimeout\" : 60,\r\n"
- + " \"artifactUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\",\r\n" + " \"artifactVersion\" : \"1\"\r\n"
- + " }, {\r\n" + " \"artifactName\" : \"heat.env\",\r\n"
- + " \"artifactType\" : \"HEAT_ENV\",\r\n"
- + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.env\",\r\n"
- + " \"artifactChecksum\" : \"NGIzMjExZTM1NDc2NjBjOTQyMGJmMWNiMmU0NTE5NzM\\u003d\",\r\n"
- + " \"artifactDescription\" : \"Auto-generated HEAT Environment deployment artifact\",\r\n"
- + " \"artifactTimeout\" : 0,\r\n" + " \"artifactUUID\" : \"ce65d31c-35c0-43a9-90c7-596fc51d0c86\",\r\n"
- + " \"artifactVersion\" : \"1\",\r\n"
- + " \"generatedFromUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\"\r\n" + " }\r\n" + " ],\r\n"
- + " \"resources\" : [{\r\n" + " \"resourceInstanceName\" : \"testnotificationvf11\",\r\n"
- + " \"resourceName\" : \"TestNotificationVF1\",\r\n" + " \"resourceVersion\" : \"1.0\",\r\n"
- + " \"resoucreType\" : \"VF\",\r\n" + " \"resourceUUID\" : \"907e1746-9f69-40f5-9f2a-313654092a2d\",\r\n"
- + " \"artifacts\" : [{\r\n" + " \"artifactName\" : \"sample-xml-alldata-1-1.xml\",\r\n"
- + " \"artifactType\" : \"YANG_XML\",\r\n"
- + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/sample-xml-alldata-1-1.xml\",\r\n"
- + " \"artifactChecksum\" : \"MTUxODFkMmRlOTNhNjYxMGYyYTI1ZjA5Y2QyNWQyYTk\\u003d\",\r\n"
- + " \"artifactDescription\" : \"MyYang\",\r\n" + " \"artifactTimeout\" : 0,\r\n"
- + " \"artifactUUID\" : \"0005bc4a-2c19-452e-be6d-d574a56be4d0\",\r\n" + " \"artifactVersion\" : \"1\"\r\n"
- + " }, {\r\n" + " \"artifactName\" : \"heat.yaml\",\r\n"
- + " \"artifactType\" : \"HEAT\",\r\n"
- + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.yaml\",\r\n"
- + " \"artifactChecksum\" : \"ODEyNjE4YTMzYzRmMTk2ODVhNTU2NTg3YWEyNmIxMTM\\u003d\",\r\n"
- + " \"artifactDescription\" : \"heat\",\r\n" + " \"artifactTimeout\" : 60,\r\n"
- + " \"artifactUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\",\r\n" + " \"artifactVersion\" : \"1\"\r\n"
- + " }, {\r\n" + " \"artifactName\" : \"heat.env\",\r\n"
- + " \"artifactType\" : \"HEAT_ENV\",\r\n"
- + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.env\",\r\n"
- + " \"artifactChecksum\" : \"NGIzMjExZTM1NDc2NjBjOTQyMGJmMWNiMmU0NTE5NzM\\u003d\",\r\n"
- + " \"artifactDescription\" : \"Auto-generated HEAT Environment deployment artifact\",\r\n"
- + " \"artifactTimeout\" : 0,\r\n" + " \"artifactUUID\" : \"ce65d31c-35c0-43a9-90c7-596fc51d0c86\",\r\n"
- + " \"artifactVersion\" : \"1\",\r\n"
- + " \"generatedFromUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\"\r\n" + " }\r\n" + " ]\r\n" + " }\r\n"
- + " ]\r\n" + "}";
- }
-
- private <T> int countInstances(List<T> list, T element) {
- int count = 0;
- for (T curr : list) {
- if (curr.equals(element)) {
- count++;
- }
- }
- return count;
- }
-}
+ private final SdcKafkaConsumer consumer = mock(SdcKafkaConsumer.class);
+ private final INotificationCallback clientCallback = spy(INotificationCallback.class);
+ private final Queue<Iterable<String>> notificationsQueue = new LinkedList<>();
+ private final DistributionClientImpl distributionClient = Mockito.spy(DistributionClientImpl.class);
+ private List<String> artifactsTypes = List.of(ArtifactTypeEnum.HEAT.name());
+ private final List<Boolean> notificationStatusResults = new ArrayList<>();
+ final static IDistributionClientResult DISTRIBUTION_SUCCESS_RESULT = buildSuccessResult();
+
+ private NotificationConsumer createNotificationConsumer() {
+ return new NotificationConsumer(consumer, clientCallback, artifactsTypes, distributionClient);
+ }
+
+ @BeforeEach
+ public void beforeTest() {
+ Mockito.reset(clientCallback, distributionClient);
+ when(consumer.poll()).then((Answer<Iterable<String>>) invocation -> {
+ if (!notificationsQueue.isEmpty()) {
+ return notificationsQueue.remove();
+ } else {
+ return new ArrayList<>();
+ }
+ });
+ when(distributionClient.sendNotificationStatus(Mockito.anyLong(), Mockito.anyString(), Mockito.any(ArtifactInfoImpl.class), Mockito.anyBoolean())).then(
+ (Answer<IDistributionClientResult>) invocation -> {
+ boolean isNotified = (boolean) invocation.getArguments()[3];
+ notificationStatusResults.add(isNotified);
+ return DISTRIBUTION_SUCCESS_RESULT;
+ });
+
+ }
+
+ private static IDistributionClientResult buildSuccessResult() {
+ return new IDistributionClientResult() {
+
+ @Override
+ public String getDistributionMessageResult() {
+ return "";
+ }
+
+ @Override
+ public DistributionActionResultEnum getDistributionActionResult() {
+ return DistributionActionResultEnum.SUCCESS;
+ }
+ };
+ }
+
+ @Test
+ void testNoNotifiactionsSent() throws InterruptedException {
+
+ ScheduledExecutorService executorPool = Executors.newScheduledThreadPool(DistributionClientConstants.POOL_SIZE);
+ executorPool.scheduleAtFixedRate(createNotificationConsumer(), 0, 100, TimeUnit.MILLISECONDS);
+
+ Thread.sleep(1000);
+ executorPool.shutdown();
+
+ Mockito.verify(clientCallback, Mockito.times(0)).activateCallback(Mockito.any(INotificationData.class));
+
+ }
+
+ @Test
+ void testNonRelevantNotificationSent() throws InterruptedException {
+
+ simulateNotificationFromMessageBus(getSdcServiceNotificationWithoutHeatArtifact());
+ Mockito.verify(clientCallback, Mockito.times(0)).activateCallback(Mockito.any(INotificationData.class));
+
+ }
+
+ @Test
+ void testRelevantNotificationSent() throws InterruptedException {
+ simulateNotificationFromMessageBus(getSdcServiceNotificationWithHeatArtifact());
+ Mockito.verify(clientCallback, Mockito.times(1)).activateCallback(Mockito.any(INotificationData.class));
+
+ }
+
+ @Test
+ void testNonExistingArtifactsNotificationSent() throws InterruptedException {
+ simulateNotificationFromMessageBus(getSdcNotificationWithNonExistentArtifact());
+ Mockito.verify(clientCallback, Mockito.times(1)).activateCallback(Mockito.any(INotificationData.class));
+
+ }
+
+ @Test
+ void testNotificationStatusSent() throws InterruptedException {
+ simulateNotificationFromMessageBus(getSdcServiceNotificationWithHeatArtifact());
+
+ Mockito.verify(distributionClient, Mockito.times(3)).sendNotificationStatus(Mockito.anyLong(), Mockito.anyString(), Mockito.any(ArtifactInfoImpl.class), Mockito.anyBoolean());
+ assertEquals(1, countInstances(notificationStatusResults, Boolean.TRUE));
+ assertEquals(2, countInstances(notificationStatusResults, Boolean.FALSE));
+ }
+
+ @Test
+ void testNotificationRelatedArtifacts() throws InterruptedException {
+ List<String> artifactTypesTmp = new ArrayList<>();
+ for (ArtifactTypeEnum artifactTypeEnum : ArtifactTypeEnum.values()) {
+ artifactTypesTmp.add(artifactTypeEnum.name());
+ }
+ artifactsTypes = artifactTypesTmp;
+ simulateNotificationFromMessageBus(getSdcServiceNotificationWithRelatedArtifacts());
+
+ Mockito.verify(distributionClient, Mockito.times(3)).sendNotificationStatus(Mockito.anyLong(), Mockito.anyString(), Mockito.any(ArtifactInfoImpl.class), Mockito.anyBoolean());
+ assertEquals(3, countInstances(notificationStatusResults, Boolean.TRUE));
+ assertEquals(0, countInstances(notificationStatusResults, Boolean.FALSE));
+ }
+
+ @Test
+ void testNotificationStatusWithServiceArtifatcs() throws InterruptedException {
+ simulateNotificationFromMessageBus(getNotificationWithServiceArtifatcs());
+ Mockito.verify(distributionClient, Mockito.times(6)).sendNotificationStatus(Mockito.anyLong(), Mockito.anyString(), Mockito.any(ArtifactInfoImpl.class), Mockito.anyBoolean());
+ assertEquals(2, countInstances(notificationStatusResults, Boolean.TRUE));
+ assertEquals(4, countInstances(notificationStatusResults, Boolean.FALSE));
+
+ }
+
+ @Test
+ final void testBuildCallbackNotificationLogicFlagIsFalse() {
+ NotificationConsumer consumer = createNotificationConsumer();
+ Gson gson = new GsonBuilder().setPrettyPrinting().create();
+ TestConfiguration testConfiguration = new TestConfiguration();
+ testConfiguration.setFilterInEmptyResources(false);
+ when(distributionClient.getConfiguration()).thenReturn(testConfiguration);
+ NotificationDataImpl notification = gson.fromJson(getNotificationWithMultipleResources(), NotificationDataImpl.class);
+ NotificationDataImpl notificationBuiltInClient = consumer.buildCallbackNotificationLogic(0, notification);
+ assertEquals(1, notificationBuiltInClient.getResources().size());
+ }
+
+ @Test
+ final void testBuildCallbackNotificationLogicFlagIsTrue() {
+ NotificationConsumer consumer = createNotificationConsumer();
+ Gson gson = new GsonBuilder().setPrettyPrinting().create();
+ TestConfiguration testConfiguration = new TestConfiguration();
+ testConfiguration.setFilterInEmptyResources(true);
+ when(distributionClient.getConfiguration()).thenReturn(testConfiguration);
+ NotificationDataImpl notification = gson.fromJson(getNotificationWithMultipleResources(), NotificationDataImpl.class);
+ NotificationDataImpl notificationBuiltInClient = consumer.buildCallbackNotificationLogic(0, notification);
+ assertEquals(2, notificationBuiltInClient.getResources().size());
+ }
+
+ private void simulateNotificationFromMessageBus(final String notificationFromMessageBus) throws InterruptedException {
+ ScheduledExecutorService executorPool = Executors.newScheduledThreadPool(DistributionClientConstants.POOL_SIZE);
+ executorPool.scheduleAtFixedRate(createNotificationConsumer(), 0, 100, TimeUnit.MILLISECONDS);
+
+ Thread.sleep(200);
+
+ List<String> nonHeatNotification = List.of(notificationFromMessageBus);
+ notificationsQueue.add(nonHeatNotification);
+ Thread.sleep(800);
+ executorPool.shutdown();
+ }
+
+ private String getSdcServiceNotificationWithHeatArtifact() {
+ return "{\"distributionID\" : \"bcc7a72e-90b1-4c5f-9a37-28dc3cd86416\",\r\n" + " \"serviceName\" : \"Testnotificationser1\",\r\n" + " \"serviceVersion\" : \"1.0\",\r\n"
+ + " \"serviceUUID\" : \"7f7f94f4-373a-4b71-a0e3-80ae2ba4eb5d\",\r\n" + " \"serviceDescription\" : \"TestNotificationVF1\",\r\n" + " \"resources\" : [{\r\n" + " \"resourceInstanceName\" : \"testnotificationvf11\",\r\n"
+ + " \"resourceName\" : \"TestNotificationVF1\",\r\n" + " \"resourceVersion\" : \"1.0\",\r\n" + " \"resoucreType\" : \"VF\",\r\n" + " \"resourceUUID\" : \"907e1746-9f69-40f5-9f2a-313654092a2d\",\r\n"
+ + " \"artifacts\" : [{\r\n" + " \"artifactName\" : \"sample-xml-alldata-1-1.xml\",\r\n" + " \"artifactType\" : \"YANG_XML\",\r\n"
+ + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/sample-xml-alldata-1-1.xml\",\r\n"
+ + " \"artifactChecksum\" : \"MTUxODFkMmRlOTNhNjYxMGYyYTI1ZjA5Y2QyNWQyYTk\\u003d\",\r\n" + " \"artifactDescription\" : \"MyYang\",\r\n" + " \"artifactTimeout\" : 0,\r\n"
+ + " \"artifactUUID\" : \"0005bc4a-2c19-452e-be6d-d574a56be4d0\",\r\n" + " \"artifactVersion\" : \"1\"\r\n" + " }, {\r\n" + " \"artifactName\" : \"heat.yaml\",\r\n"
+ + " \"artifactType\" : \"HEAT\",\r\n" + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.yaml\",\r\n"
+ + " \"artifactChecksum\" : \"ODEyNjE4YTMzYzRmMTk2ODVhNTU2NTg3YWEyNmIxMTM\\u003d\",\r\n" + " \"artifactDescription\" : \"heat\",\r\n" + " \"artifactTimeout\" : 60,\r\n"
+ + " \"artifactUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\",\r\n" + " \"artifactVersion\" : \"1\"\r\n" + " }, {\r\n" + " \"artifactName\" : \"heat.env\",\r\n"
+ + " \"artifactType\" : \"HEAT_ENV\",\r\n" + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.env\",\r\n"
+ + " \"artifactChecksum\" : \"NGIzMjExZTM1NDc2NjBjOTQyMGJmMWNiMmU0NTE5NzM\\u003d\",\r\n" + " \"artifactDescription\" : \"Auto-generated HEAT Environment deployment artifact\",\r\n"
+ + " \"artifactTimeout\" : 0,\r\n" + " \"artifactUUID\" : \"ce65d31c-35c0-43a9-90c7-596fc51d0c86\",\r\n" + " \"artifactVersion\" : \"1\",\r\n"
+ + " \"generatedFromUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\"\r\n" + " }\r\n" + " ]\r\n" + " }\r\n" + " ]}";
+ }
+
+ private String getNotificationWithMultipleResources(){
+ return "{\"distributionID\" : \"bcc7a72e-90b1-4c5f-9a37-28dc3cd86416\",\r\n" +
+ " \"serviceName\" : \"Testnotificationser1\",\r\n" +
+ " \"serviceVersion\" : \"1.0\",\r\n" +
+ " \"serviceUUID\" : \"7f7f94f4-373a-4b71-a0e3-80ae2ba4eb5d\",\r\n" +
+ " \"serviceDescription\" : \"TestNotificationVF1\",\r\n" +
+ " \"resources\" : [{\r\n" +
+ " \"resourceInstanceName\" : \"testnotificationvf11\",\r\n" +
+ " \"resourceName\" : \"TestNotificationVF1\",\r\n" +
+ " \"resourceVersion\" : \"1.0\",\r\n" +
+ " \"resoucreType\" : \"VF\",\r\n" +
+ " \"resourceUUID\" : \"907e1746-9f69-40f5-9f2a-313654092a2d\",\r\n" +
+ " \"artifacts\" : [{\r\n" +
+ " \"artifactName\" : \"sample-xml-alldata-1-1.xml\",\r\n" +
+ " \"artifactType\" : \"YANG_XML\",\r\n" +
+ " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/sample-xml-alldata-1-1.xml\",\r\n" +
+ " \"artifactChecksum\" : \"MTUxODFkMmRlOTNhNjYxMGYyYTI1ZjA5Y2QyNWQyYTk\\u003d\",\r\n" +
+ " \"artifactDescription\" : \"MyYang\",\r\n" +
+ " \"artifactTimeout\" : 0,\r\n" +
+ " \"artifactUUID\" : \"0005bc4a-2c19-452e-be6d-d574a56be4d0\",\r\n" +
+ " \"artifactVersion\" : \"1\"\r\n" +
+ " }" +
+ " ]\r\n" +
+ " },\r\n" +
+ " {\r\n" +
+ " \"resourceInstanceName\" : \"testnotificationvf12\",\r\n" +
+ " \"resourceName\" : \"TestNotificationVF1\",\r\n" +
+ " \"resourceVersion\" : \"1.0\",\r\n" +
+ " \"resoucreType\" : \"VF\",\r\n" +
+ " \"resourceUUID\" : \"907e1746-9f69-40f5-9f2a-313654092a2e\",\r\n" +
+ " \"artifacts\" : [{\r\n" +
+ " \"artifactName\" : \"heat.yaml\",\r\n" +
+ " \"artifactType\" : \"HEAT\",\r\n" +
+ " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.yaml\",\r\n" +
+ " \"artifactChecksum\" : \"ODEyNjE4YTMzYzRmMTk2ODVhNTU2NTg3YWEyNmIxMTM\\u003d\",\r\n" +
+ " \"artifactDescription\" : \"heat\",\r\n" +
+ " \"artifactTimeout\" : 60,\r\n" +
+ " \"artifactUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\",\r\n" +
+ " \"artifactVersion\" : \"1\"\r\n" +
+ " }" +
+ " ]\r\n" +
+ " }\r\n" +
+ " ]}";
+ }
+
+
+ private String getSdcNotificationWithNonExistentArtifact() {
+ return "{\"distributionID\" : \"bcc7a72e-90b1-4c5f-9a37-28dc3cd86416\",\r\n" + " \"serviceName\" : \"Testnotificationser1\",\r\n" + " \"serviceVersion\" : \"1.0\",\r\n"
+ + " \"serviceUUID\" : \"7f7f94f4-373a-4b71-a0e3-80ae2ba4eb5d\",\r\n" + " \"serviceDescription\" : \"TestNotificationVF1\",\r\n" + " \"bugabuga\" : \"xyz\",\r\n" + " \"resources\" : [{\r\n"
+ + " \"resourceInstanceName\" : \"testnotificationvf11\",\r\n" + " \"resourceName\" : \"TestNotificationVF1\",\r\n" + " \"resourceVersion\" : \"1.0\",\r\n" + " \"resoucreType\" : \"VF\",\r\n"
+ + " \"resourceUUID\" : \"907e1746-9f69-40f5-9f2a-313654092a2d\",\r\n" + " \"artifacts\" : [{\r\n" + " \"artifactName\" : \"heat.yaml\",\r\n" + " \"artifactType\" : \"HEAT\",\r\n"
+ + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.yaml\",\r\n"
+ + " \"artifactChecksum\" : \"ODEyNjE4YTMzYzRmMTk2ODVhNTU2NTg3YWEyNmIxMTM\\u003d\",\r\n" + " \"artifactDescription\" : \"heat\",\r\n" + " \"artifactTimeout\" : 60,\r\n"
+ + " \"artifactUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\",\r\n" + " \"artifactBuga\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\",\r\n" + " \"artifactVersion\" : \"1\"\r\n"
+ + " }, {\r\n" + " \"artifactName\" : \"buga.bug\",\r\n" + " \"artifactType\" : \"BUGA_BUGA\",\r\n"
+ + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.env\",\r\n"
+ + " \"artifactChecksum\" : \"NGIzMjExZTM1NDc2NjBjOTQyMGJmMWNiMmU0NTE5NzM\\u003d\",\r\n" + " \"artifactDescription\" : \"Auto-generated HEAT Environment deployment artifact\",\r\n"
+ + " \"artifactTimeout\" : 0,\r\n" + " \"artifactUUID\" : \"ce65d31c-35c0-43a9-90c7-596fc51d0c86\",\r\n" + " \"artifactVersion\" : \"1\",\r\n"
+ + " \"generatedFromUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\"\r\n" + " }\r\n" + " ]\r\n" + " }\r\n" + " ]}";
+ }
+
+ private String getSdcServiceNotificationWithRelatedArtifacts() {
+ return "{\"distributionID\" : \"bcc7a72e-90b1-4c5f-9a37-28dc3cd86416\",\r\n" + " \"serviceName\" : \"Testnotificationser1\",\r\n" + " \"serviceVersion\" : \"1.0\",\r\n"
+ + " \"serviceUUID\" : \"7f7f94f4-373a-4b71-a0e3-80ae2ba4eb5d\",\r\n" + " \"serviceDescription\" : \"TestNotificationVF1\",\r\n" + " \"resources\" : [{\r\n" + " \"resourceInstanceName\" : \"testnotificationvf11\",\r\n"
+ + " \"resourceName\" : \"TestNotificationVF1\",\r\n" + " \"resourceVersion\" : \"1.0\",\r\n" + " \"resoucreType\" : \"VF\",\r\n" + " \"resourceUUID\" : \"907e1746-9f69-40f5-9f2a-313654092a2d\",\r\n"
+ + " \"artifacts\" : [{\r\n" + " \"artifactName\" : \"sample-xml-alldata-1-1.xml\",\r\n" + " \"artifactType\" : \"YANG_XML\",\r\n"
+ + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/sample-xml-alldata-1-1.xml\",\r\n"
+ + " \"artifactChecksum\" : \"MTUxODFkMmRlOTNhNjYxMGYyYTI1ZjA5Y2QyNWQyYTk\\u003d\",\r\n" + " \"artifactDescription\" : \"MyYang\",\r\n" + " \"artifactTimeout\" : 0,\r\n"
+ + " \"artifactUUID\" : \"0005bc4a-2c19-452e-be6d-d574a56be4d0\",\r\n" + " \"artifactVersion\" : \"1\",\r\n" + " \"relatedArtifacts\" : [\r\n"
+ + " \"ce65d31c-35c0-43a9-90c7-596fc51d0c86\"\r\n" + " ]" + " }, {\r\n" + " \"artifactName\" : \"heat.yaml\",\r\n"
+ + " \"artifactType\" : \"HEAT\",\r\n" + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.yaml\",\r\n"
+ + " \"artifactChecksum\" : \"ODEyNjE4YTMzYzRmMTk2ODVhNTU2NTg3YWEyNmIxMTM\\u003d\",\r\n" + " \"artifactDescription\" : \"heat\",\r\n" + " \"artifactTimeout\" : 60,\r\n"
+ + " \"artifactUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\",\r\n" + " \"artifactVersion\" : \"1\", \r\n" + " \"relatedArtifacts\" : [\r\n"
+ + " \"0005bc4a-2c19-452e-be6d-d574a56be4d0\", \r\n" + " \"ce65d31c-35c0-43a9-90c7-596fc51d0c86\"\r\n" + " ]" + " }, {\r\n"
+ + " \"artifactName\" : \"heat.env\",\r\n" + " \"artifactType\" : \"HEAT_ENV\",\r\n"
+ + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.env\",\r\n"
+ + " \"artifactChecksum\" : \"NGIzMjExZTM1NDc2NjBjOTQyMGJmMWNiMmU0NTE5NzM\\u003d\",\r\n" + " \"artifactDescription\" : \"Auto-generated HEAT Environment deployment artifact\",\r\n"
+ + " \"artifactTimeout\" : 0,\r\n" + " \"artifactUUID\" : \"ce65d31c-35c0-43a9-90c7-596fc51d0c86\",\r\n" + " \"artifactVersion\" : \"1\",\r\n"
+ + " \"generatedFromUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\"\r\n" + " }\r\n" + " ]\r\n" + " }\r\n" + " ]}";
+ }
+
+ private String getSdcServiceNotificationWithoutHeatArtifact() {
+ return "{" + " \"distributionID\" : \"5v1234d8-5b6d-42c4-7t54-47v95n58qb7\"," + " \"serviceName\" : \"srv1\"," + " \"serviceVersion\": \"2.0\"," + " \"serviceUUID\" : \"4e0697d8-5b6d-42c4-8c74-46c33d46624c\","
+ + " \"serviceArtifacts\":[" + " {" + " \"artifactName\" : \"ddd.yml\"," + " \"artifactType\" : \"DG_XML\"," + " \"artifactTimeout\" : \"65\","
+ + " \"artifactDescription\" : \"description\"," + " \"artifactURL\" :" + " \"/sdc/v1/catalog/services/srv1/2.0/resources/ddd/3.0/artifacts/ddd.xml\" ,"
+ + " \"resourceUUID\" : \"4e5874d8-5b6d-42c4-8c74-46c33d90drw\" ," + " \"checksum\" : \"15e389rnrp58hsw==\"" + " }" + " ]" + "}";
+ }
+
+ private String getNotificationWithServiceArtifatcs() {
+ return "{\r\n" + " \"distributionID\" : \"bcc7a72e-90b1-4c5f-9a37-28dc3cd86416\",\r\n" + " \"serviceName\" : \"Testnotificationser1\",\r\n" + " \"serviceVersion\" : \"1.0\",\r\n"
+ + " \"serviceUUID\" : \"7f7f94f4-373a-4b71-a0e3-80ae2ba4eb5d\",\r\n" + " \"serviceDescription\" : \"TestNotificationVF1\",\r\n" + " \"serviceArtifacts\" : [{\r\n" + " \"artifactName\" : \"sample-xml-alldata-1-1.xml\",\r\n"
+ + " \"artifactType\" : \"YANG_XML\",\r\n" + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/sample-xml-alldata-1-1.xml\",\r\n"
+ + " \"artifactChecksum\" : \"MTUxODFkMmRlOTNhNjYxMGYyYTI1ZjA5Y2QyNWQyYTk\\u003d\",\r\n" + " \"artifactDescription\" : \"MyYang\",\r\n" + " \"artifactTimeout\" : 0,\r\n"
+ + " \"artifactUUID\" : \"0005bc4a-2c19-452e-be6d-d574a56be4d0\",\r\n" + " \"artifactVersion\" : \"1\"\r\n" + " }, {\r\n" + " \"artifactName\" : \"heat.yaml\",\r\n"
+ + " \"artifactType\" : \"HEAT\",\r\n" + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.yaml\",\r\n"
+ + " \"artifactChecksum\" : \"ODEyNjE4YTMzYzRmMTk2ODVhNTU2NTg3YWEyNmIxMTM\\u003d\",\r\n" + " \"artifactDescription\" : \"heat\",\r\n" + " \"artifactTimeout\" : 60,\r\n"
+ + " \"artifactUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\",\r\n" + " \"artifactVersion\" : \"1\"\r\n" + " }, {\r\n" + " \"artifactName\" : \"heat.env\",\r\n"
+ + " \"artifactType\" : \"HEAT_ENV\",\r\n" + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.env\",\r\n"
+ + " \"artifactChecksum\" : \"NGIzMjExZTM1NDc2NjBjOTQyMGJmMWNiMmU0NTE5NzM\\u003d\",\r\n" + " \"artifactDescription\" : \"Auto-generated HEAT Environment deployment artifact\",\r\n"
+ + " \"artifactTimeout\" : 0,\r\n" + " \"artifactUUID\" : \"ce65d31c-35c0-43a9-90c7-596fc51d0c86\",\r\n" + " \"artifactVersion\" : \"1\",\r\n"
+ + " \"generatedFromUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\"\r\n" + " }\r\n" + " ],\r\n" + " \"resources\" : [{\r\n" + " \"resourceInstanceName\" : \"testnotificationvf11\",\r\n"
+ + " \"resourceName\" : \"TestNotificationVF1\",\r\n" + " \"resourceVersion\" : \"1.0\",\r\n" + " \"resoucreType\" : \"VF\",\r\n" + " \"resourceUUID\" : \"907e1746-9f69-40f5-9f2a-313654092a2d\",\r\n"
+ + " \"artifacts\" : [{\r\n" + " \"artifactName\" : \"sample-xml-alldata-1-1.xml\",\r\n" + " \"artifactType\" : \"YANG_XML\",\r\n"
+ + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/sample-xml-alldata-1-1.xml\",\r\n"
+ + " \"artifactChecksum\" : \"MTUxODFkMmRlOTNhNjYxMGYyYTI1ZjA5Y2QyNWQyYTk\\u003d\",\r\n" + " \"artifactDescription\" : \"MyYang\",\r\n" + " \"artifactTimeout\" : 0,\r\n"
+ + " \"artifactUUID\" : \"0005bc4a-2c19-452e-be6d-d574a56be4d0\",\r\n" + " \"artifactVersion\" : \"1\"\r\n" + " }, {\r\n" + " \"artifactName\" : \"heat.yaml\",\r\n"
+ + " \"artifactType\" : \"HEAT\",\r\n" + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.yaml\",\r\n"
+ + " \"artifactChecksum\" : \"ODEyNjE4YTMzYzRmMTk2ODVhNTU2NTg3YWEyNmIxMTM\\u003d\",\r\n" + " \"artifactDescription\" : \"heat\",\r\n" + " \"artifactTimeout\" : 60,\r\n"
+ + " \"artifactUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\",\r\n" + " \"artifactVersion\" : \"1\"\r\n" + " }, {\r\n" + " \"artifactName\" : \"heat.env\",\r\n"
+ + " \"artifactType\" : \"HEAT_ENV\",\r\n" + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.env\",\r\n"
+ + " \"artifactChecksum\" : \"NGIzMjExZTM1NDc2NjBjOTQyMGJmMWNiMmU0NTE5NzM\\u003d\",\r\n" + " \"artifactDescription\" : \"Auto-generated HEAT Environment deployment artifact\",\r\n"
+ + " \"artifactTimeout\" : 0,\r\n" + " \"artifactUUID\" : \"ce65d31c-35c0-43a9-90c7-596fc51d0c86\",\r\n" + " \"artifactVersion\" : \"1\",\r\n"
+ + " \"generatedFromUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\"\r\n" + " }\r\n" + " ]\r\n" + " }\r\n" + " ]\r\n" + "}";
+ }
+
+ private <T> int countInstances(List<T> list, T element) {
+ int count = 0;
+ for (T curr : list) {
+ if (curr.equals(element)) {
+ count++;
+ }
+ }
+ return count;
+ }
+} \ No newline at end of file
diff --git a/sdc-distribution-client/src/test/java/org/onap/sdc/utils/NotificationSenderTest.java b/sdc-distribution-client/src/test/java/org/onap/sdc/utils/NotificationSenderTest.java
index 4d61542..ed15094 100644
--- a/sdc-distribution-client/src/test/java/org/onap/sdc/utils/NotificationSenderTest.java
+++ b/sdc-distribution-client/src/test/java/org/onap/sdc/utils/NotificationSenderTest.java
@@ -20,115 +20,71 @@
package org.onap.sdc.utils;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.cambria.client.CambriaPublisher;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
+import java.util.concurrent.Future;
+import nl.altindag.log.LogCaptor;
+import org.apache.kafka.common.KafkaException;
import org.junit.jupiter.api.Test;
import org.onap.sdc.api.results.IDistributionClientResult;
import org.onap.sdc.impl.DistributionClientResultImpl;
+import org.onap.sdc.utils.kafka.SdcKafkaProducer;
class NotificationSenderTest {
private final String status = "status";
- private final CambriaPublisher.message message = new CambriaPublisher.message("sample-partition", "sample-message");
- private final List<CambriaPublisher.message> notEmptySendingFailedMessages = Collections.singletonList(message);
+
private final DistributionClientResultImpl successResponse = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS,
"Messages successfully sent");
- private final DistributionClientResultImpl generalErrorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR,
- "Failed to send status");
-
- private final CambriaBatchingPublisher publisher = mock(CambriaBatchingPublisher.class);
- private final List<String> emptyServers = Collections.emptyList();
- private final NotificationSender validNotificationSender = new NotificationSender(emptyServers);
+ private final DistributionClientResultImpl generalErrorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "Failed to send status");
+ private final SdcKafkaProducer producer = mock(SdcKafkaProducer.class);
+ private final NotificationSender validNotificationSender = new NotificationSender(producer);
@Test
- void whenPublisherIsValidAndNoExceptionsAreThrownShouldReturnSuccessStatus() throws IOException, InterruptedException {
+ void whenPublisherIsValidAndNoExceptionsAreThrownShouldReturnSuccessStatus() {
//given
- when(publisher.send(anyString(), anyString())).thenReturn(0);
- when(publisher.close(anyLong(), any())).thenReturn(Collections.emptyList());
+ when(producer.send(anyString(), anyString(), anyString())).thenReturn(mock(Future.class));
//when
- IDistributionClientResult result = validNotificationSender.send(publisher, status);
+ IDistributionClientResult result = validNotificationSender.send("mytopic", status);
//then
assertEquals(successResponse.getDistributionActionResult(), result.getDistributionActionResult());
}
@Test
- void whenPublisherCouldNotSendShouldReturnGeneralErrorStatus() throws IOException, InterruptedException {
+ void whenPublisherCouldNotSendShouldReturnGeneralErrorStatus() {
//given
- when(publisher.send(anyString(), anyString())).thenReturn(0);
- when(publisher.close(anyLong(), any())).thenReturn(notEmptySendingFailedMessages);
+ when(producer.send(anyString(), anyString(), anyString())).thenReturn(mock(Future.class));
+ doThrow(KafkaException.class)
+ .when(producer)
+ .flush();
//when
- IDistributionClientResult result = validNotificationSender.send(publisher, status);
+ IDistributionClientResult result = validNotificationSender.send("mytopic", status);
//then
assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult());
}
@Test
- void whenSendingThrowsIOExceptionShouldReturnGeneralErrorStatus() throws IOException, InterruptedException {
- //given
- when(publisher.send(anyString(), anyString())).thenThrow(new IOException());
- when(publisher.close(anyLong(), any())).thenReturn(notEmptySendingFailedMessages);
-
- //when
- IDistributionClientResult result = validNotificationSender.send(publisher, status);
+ void whenSendingThrowsIOExceptionShouldReturnGeneralErrorStatus() {
+ LogCaptor logCaptor = LogCaptor.forClass(NotificationSender.class);
- //then
- assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult());
- }
-
- @Test
- void whenSendingThrowsInterruptedExceptionShouldReturnGeneralErrorStatus() throws IOException, InterruptedException {
//given
- when(publisher.send(anyString(), anyString())).thenAnswer(invocationOnMock -> {
- throw new InterruptedException();
- });
- when(publisher.close(anyLong(), any())).thenReturn(notEmptySendingFailedMessages);
+ when(producer.send(anyString(), anyString(), anyString())).thenThrow(new KafkaException());
//when
- IDistributionClientResult result = validNotificationSender.send(publisher, status);
+ validNotificationSender.send("mytopic", status);
//then
- assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult());
+ assertThat(logCaptor.getLogs()).contains("DistributionClient - sendStatus. Failed to send status");
}
- @Test
- void whenClosingThrowsIOExceptionShouldReturnGeneralErrorStatus() throws IOException, InterruptedException {
- //given
- when(publisher.send(anyString(), anyString())).thenReturn(0);
- when(publisher.close(anyLong(), any())).thenThrow(new IOException());
-
- //when
- IDistributionClientResult result = validNotificationSender.send(publisher, status);
-
- //then
- assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult());
- }
-
- @Test
- void whenClosingThrowsInterruptedExceptionShouldReturnGeneralErrorStatus() throws IOException, InterruptedException {
- //given
- when(publisher.send(anyString(), anyString())).thenReturn(0);
- when(publisher.close(anyLong(), any())).thenAnswer(invocationOnMock -> {
- throw new InterruptedException();
- });
-
- //when
- IDistributionClientResult result = validNotificationSender.send(publisher, status);
-
- //then
- assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult());
- }
}
+
diff --git a/sdc-distribution-client/src/test/java/org/onap/sdc/utils/SdcKafkaTest.java b/sdc-distribution-client/src/test/java/org/onap/sdc/utils/SdcKafkaTest.java
new file mode 100644
index 0000000..744e9cc
--- /dev/null
+++ b/sdc-distribution-client/src/test/java/org/onap/sdc/utils/SdcKafkaTest.java
@@ -0,0 +1,104 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * sdc-distribution-client
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.sdc.utils;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.contentOf;
+
+import com.salesforce.kafka.test.KafkaTestCluster;
+import com.salesforce.kafka.test.KafkaTestUtils;
+import com.salesforce.kafka.test.listeners.BrokerListener;
+import com.salesforce.kafka.test.listeners.SaslPlainListener;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junitpioneer.jupiter.SetEnvironmentVariable;
+import org.onap.sdc.api.consumer.IConfiguration;
+import org.onap.sdc.impl.Configuration;
+import org.onap.sdc.utils.kafka.SdcKafkaConsumer;
+import org.onap.sdc.utils.kafka.SdcKafkaProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SetEnvironmentVariable(key = "SASL_JAAS_CONFIG", value = "org.apache.kafka.common.security.scram.ScramLoginModule required username=admin password=admin-secret;")
+class SdcKafkaTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(SdcKafkaTest.class);
+
+ private static final Configuration configuration = new Configuration(new TestConfiguration());
+ private static KafkaTestCluster kafkaTestCluster = null;
+ private static final String topicName = "my-test-topic";
+
+ static {
+ System.setProperty("java.security.auth.login.config", "src/test/resources/jaas.conf");
+ }
+
+ @BeforeAll
+ static void before() throws Exception {
+ startKafkaService();
+ KafkaTestUtils utils = new KafkaTestUtils(kafkaTestCluster);
+ utils.createTopic(topicName, 1, (short) 1);
+ configuration.setMsgBusAddress(Collections.singletonList(kafkaTestCluster.getKafkaConnectString()));
+ }
+
+ @AfterAll
+ static void after() throws Exception {
+ kafkaTestCluster.close();
+ kafkaTestCluster.stop();
+ }
+
+ @Test
+ void whenProducingCorrectRecordsArePresent() {
+ SdcKafkaConsumer consumer = new SdcKafkaConsumer(configuration);
+ consumer.subscribe(topicName);
+ consumer.poll();
+
+ SdcKafkaProducer producer = new SdcKafkaProducer(configuration);
+ producer.send(topicName, "blah", "blah");
+ producer.send(topicName, "blah", "blah");
+ producer.send(topicName, "blah", "blah");
+ producer.flush();
+
+ List<String> events = consumer.poll();
+
+ assertThat(events).hasSize(3);
+ }
+
+ private static void startKafkaService() throws Exception {
+ final BrokerListener listener = new SaslPlainListener()
+ .withUsername("kafkaclient")
+ .withPassword("client-secret");
+
+ final Properties brokerProperties = new Properties();
+ kafkaTestCluster = new KafkaTestCluster(
+ 1,
+ brokerProperties,
+ Collections.singletonList(listener)
+ );
+
+ kafkaTestCluster.start();
+ logger.debug("Cluster started at: {}", kafkaTestCluster.getKafkaConnectString());
+ }
+} \ No newline at end of file
diff --git a/sdc-distribution-client/src/test/java/org/onap/sdc/utils/TestConfiguration.java b/sdc-distribution-client/src/test/java/org/onap/sdc/utils/TestConfiguration.java
index ca7abba..c2fc536 100644
--- a/sdc-distribution-client/src/test/java/org/onap/sdc/utils/TestConfiguration.java
+++ b/sdc-distribution-client/src/test/java/org/onap/sdc/utils/TestConfiguration.java
@@ -22,12 +22,11 @@ package org.onap.sdc.utils;
import java.util.ArrayList;
import java.util.List;
-
import org.onap.sdc.api.consumer.IConfiguration;
public class TestConfiguration implements IConfiguration {
- private String asdcAddress;
+ private String sdcAddress;
private String user;
private String password;
private int pollingInterval = DistributionClientConstants.MIN_POLLING_INTERVAL_SEC;
@@ -36,11 +35,13 @@ public class TestConfiguration implements IConfiguration {
private String consumerGroup;
private String environmentName;
private String comsumerID;
+ private final String kafkaSecurityProtocolConfig;
+ private final String kafkaSaslMechanism;
+ private final String kafkaSaslJaasConfig;
private String keyStorePath;
private String keyStorePassword;
private boolean activateServerTLSAuth;
private boolean isFilterInEmptyResources;
- private boolean useHttpsWithDmaap;
private boolean useHttpsWithSDC;
private List<String> msgBusAddress;
private String httpProxyHost;
@@ -48,11 +49,16 @@ public class TestConfiguration implements IConfiguration {
private String httpsProxyHost;
private int httpsProxyPort;
private boolean useSystemProxy;
+ private String sdcStatusTopicName;
+ private String sdcNotificationTopicName;
public TestConfiguration(IConfiguration other) {
- this.asdcAddress = other.getAsdcAddress();
+ this.sdcAddress = other.getSdcAddress();
this.comsumerID = other.getConsumerID();
this.consumerGroup = other.getConsumerGroup();
+ this.kafkaSecurityProtocolConfig = other.getKafkaSecurityProtocolConfig();
+ this.kafkaSaslMechanism = other.getKafkaSaslMechanism();
+ this.kafkaSaslJaasConfig = other.getKafkaSaslJaasConfig();
this.environmentName = other.getEnvironmentName();
this.password = other.getPassword();
this.pollingInterval = other.getPollingInterval();
@@ -71,30 +77,37 @@ public class TestConfiguration implements IConfiguration {
}
public TestConfiguration() {
- this.asdcAddress = "localhost:8443";
+ this.sdcAddress = "localhost:8443";
this.comsumerID = "mso-123456";
this.consumerGroup = "mso-group";
this.environmentName = "PROD";
this.password = "password";
this.pollingInterval = 20;
this.pollingTimeout = 20;
- this.relevantArtifactTypes = new ArrayList<String>();
+ this.setSdcStatusTopicName("SDC-STATUS-TOPIC");
+ this.setSdcNotificationTopicName("SDC-NOTIF-TOPIC");
+ this.relevantArtifactTypes = new ArrayList<>();
this.relevantArtifactTypes.add(ArtifactTypeEnum.HEAT.name());
this.user = "mso-user";
- this.keyStorePath = "etc/asdc-client.jks";
+ this.keyStorePath = "etc/sdc-client.jks";
this.keyStorePassword = "Aa123456";
this.activateServerTLSAuth = false;
this.isFilterInEmptyResources = false;
this.useHttpsWithSDC = true;
- msgBusAddress = new ArrayList<String>();
- msgBusAddress.add("www.cnn.com");
- msgBusAddress.add("www.cnn.com");
- msgBusAddress.add("www.cnn.com");
+ msgBusAddress = new ArrayList<>();
+ msgBusAddress.add("kafka-bootstrap1:9092");
+ msgBusAddress.add("kafka-bootstrap2:9092");
+ msgBusAddress.add("kafka-bootstrap3:9092");
+ this.kafkaSecurityProtocolConfig = "SASL_PLAINTEXT";
+ this.kafkaSaslMechanism = "PLAIN";
+ this.kafkaSaslJaasConfig = "org.apache.kafka.common.security.scram.ScramLoginModule required username=admin password=admin-secret;";
+ this.httpProxyHost = "proxy";
+ this.httpProxyPort = 8080;
}
@Override
- public String getAsdcAddress() {
- return asdcAddress;
+ public String getSdcAddress() {
+ return sdcAddress;
}
@Override
@@ -103,6 +116,21 @@ public class TestConfiguration implements IConfiguration {
}
@Override
+ public String getKafkaSecurityProtocolConfig() {
+ return kafkaSecurityProtocolConfig;
+ }
+
+ @Override
+ public String getKafkaSaslMechanism() {
+ return kafkaSaslMechanism;
+ }
+
+ @Override
+ public String getKafkaSaslJaasConfig() {
+ return kafkaSaslJaasConfig;
+ }
+
+ @Override
public String getUser() {
return user;
}
@@ -185,8 +213,8 @@ public class TestConfiguration implements IConfiguration {
this.comsumerID = comsumerID;
}
- public void setAsdcAddress(String asdcAddress) {
- this.asdcAddress = asdcAddress;
+ public void setSdcAddress(String sdcAddress) {
+ this.sdcAddress = sdcAddress;
}
public void setUser(String user) {
@@ -249,7 +277,7 @@ public class TestConfiguration implements IConfiguration {
public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + ((asdcAddress == null) ? 0 : asdcAddress.hashCode());
+ result = prime * result + ((sdcAddress == null) ? 0 : sdcAddress.hashCode());
result = prime * result + ((comsumerID == null) ? 0 : comsumerID.hashCode());
result = prime * result + ((consumerGroup == null) ? 0 : consumerGroup.hashCode());
result = prime * result + ((environmentName == null) ? 0 : environmentName.hashCode());
@@ -280,10 +308,10 @@ public class TestConfiguration implements IConfiguration {
if (getClass() != obj.getClass())
return false;
TestConfiguration other = (TestConfiguration) obj;
- if (asdcAddress == null) {
- if (other.asdcAddress != null)
+ if (sdcAddress == null) {
+ if (other.sdcAddress != null)
return false;
- } else if (!asdcAddress.equals(other.asdcAddress))
+ } else if (!sdcAddress.equals(other.sdcAddress))
return false;
if (comsumerID == null) {
if (other.comsumerID != null)
@@ -325,17 +353,14 @@ public class TestConfiguration implements IConfiguration {
} else if (!keyStorePath.equals(other.keyStorePath))
return false;
if (keyStorePassword == null) {
- if (other.keyStorePassword != null)
- return false;
- } else if (!keyStorePassword.equals(other.keyStorePassword))
- return false;
-
- return true;
+ return other.keyStorePassword == null;
+ } else
+ return keyStorePassword.equals(other.keyStorePassword);
}
@Override
public String toString() {
- return "TestConfiguration [asdcAddress=" + asdcAddress + ", user=" + user + ", password=" + password
+ return "TestConfiguration [sdcAddress=" + sdcAddress + ", user=" + user + ", password=" + password
+ ", pollingInterval=" + pollingInterval + ", pollingTimeout=" + pollingTimeout
+ ", relevantArtifactTypes=" + relevantArtifactTypes + ", consumerGroup=" + consumerGroup
+ ", environmentName=" + environmentName + ", comsumerID=" + comsumerID + "]";
@@ -351,11 +376,6 @@ public class TestConfiguration implements IConfiguration {
}
@Override
- public Boolean isUseHttpsWithDmaap() {
- return this.useHttpsWithDmaap;
- }
-
- @Override
public Boolean isUseHttpsWithSDC() {
return this.useHttpsWithSDC;
}
@@ -363,4 +383,20 @@ public class TestConfiguration implements IConfiguration {
public void setUseHttpsWithSDC(Boolean useHttpsWithSDC) {
this.useHttpsWithSDC = useHttpsWithSDC;
}
+
+ public String getSdcStatusTopicName() {
+ return sdcStatusTopicName;
+ }
+
+ public void setSdcStatusTopicName(String sdcStatusTopicName) {
+ this.sdcStatusTopicName = sdcStatusTopicName;
+ }
+
+ public String getSdcNotificationTopicName() {
+ return sdcNotificationTopicName;
+ }
+
+ public void setSdcNotificationTopicName(String sdcNotificationTopicName) {
+ this.sdcNotificationTopicName = sdcNotificationTopicName;
+ }
}
diff --git a/sdc-distribution-client/src/test/resources/jaas.conf b/sdc-distribution-client/src/test/resources/jaas.conf
new file mode 100644
index 0000000..6f7fb5a
--- /dev/null
+++ b/sdc-distribution-client/src/test/resources/jaas.conf
@@ -0,0 +1,20 @@
+KafkaServer {
+ org.apache.kafka.common.security.plain.PlainLoginModule required
+ username="admin"
+ password="admin-secret"
+ user_admin="admin-secret"
+ user_kafkaclient="client-secret";
+};
+
+Server {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ username="admin"
+ password="admin-secret"
+ user_zooclient="client-secret";
+};
+
+Client {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ username="zooclient"
+ password="client-secret";
+}; \ No newline at end of file
diff --git a/sdc-distribution-client/src/test/resources/log4j.xml b/sdc-distribution-client/src/test/resources/log4j.xml
new file mode 100644
index 0000000..75e94c1
--- /dev/null
+++ b/sdc-distribution-client/src/test/resources/log4j.xml
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration debug="true" xmlns:log4j='http://jakarta.apache.org/log4j/'>
+
+ <appender name="console" class="org.apache.log4j.ConsoleAppender">
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n" />
+ </layout>
+ </appender>
+
+ <!-- categories -->
+ <category name="kafka">
+ <priority value="WARN" />
+ </category>
+ <category name="kafka.client">
+ <priority value="INFO" />
+ </category>
+ <category name="org.apache.zookeeper">
+ <priority value="WARN" />
+ </category>
+ <root>
+ <level value="INFO" />
+ <appender-ref ref="console" />
+ </root>
+
+</log4j:configuration> \ No newline at end of file
diff --git a/sdc-distribution-client/src/test/resources/asdc-client.jks b/sdc-distribution-client/src/test/resources/sdc-client.jks
index eb0a0d3..eb0a0d3 100644
--- a/sdc-distribution-client/src/test/resources/asdc-client.jks
+++ b/sdc-distribution-client/src/test/resources/sdc-client.jks
Binary files differ
diff --git a/version.properties b/version.properties
index d1722d6..890f74f 100644
--- a/version.properties
+++ b/version.properties
@@ -3,9 +3,9 @@
# Note that these variables cannot be structured (e.g. : version.release or version.snapshot etc... )
# because they are used in Jenkins, whose plug-in doesn't support
-major=1
-minor=4
-patch=5
+major=2
+minor=0
+patch=0
base_version=${major}.${minor}.${patch}