From cff56489f774f937654cb6eb198d3d5ef41418a2 Mon Sep 17 00:00:00 2001 From: efiacor Date: Thu, 16 Jun 2022 09:38:26 +0100 Subject: [STRIMZI] Migrate client from cambria to kafka native Add call to sdc to get kafka and topic details Add kafka config to IConfiguration interface Signed-off-by: efiacor Change-Id: Ibec77d1ff1cd25ad4adce133ee81d66e54c7707f Issue-ID: DMAAP-1745 --- README.md | 34 +- pom.xml | 16 +- sdc-distribution-ci/etc/asdc-client.jks | Bin 1177 -> 0 bytes sdc-distribution-ci/etc/asdcclientstore.jks | Bin 907 -> 0 bytes sdc-distribution-ci/etc/sdc-client.jks | Bin 0 -> 1177 bytes sdc-distribution-ci/etc/sdcclientstore.jks | Bin 0 -> 907 bytes sdc-distribution-ci/pom.xml | 22 +- .../test/core/config/DistributionClientConfig.java | 75 +-- .../test/core/service/ClientNotifyCallback.java | 6 +- .../org/onap/test/it/RegisterToAsdcTopicIT.java | 51 -- .../org/onap/test/it/RegisterToSdcTopicIT.java | 46 ++ .../test/core/service/ClientInitializerTest.java | 131 ++-- .../test/core/service/CustomKafkaContainer.java | 94 +++ .../src/test/resources/logback-test.xml | 14 + sdc-distribution-client/etc/asdc-client.jks | Bin 1177 -> 0 bytes sdc-distribution-client/etc/asdcclientstore.jks | Bin 907 -> 0 bytes sdc-distribution-client/etc/sdc-client.jks | Bin 0 -> 1177 bytes sdc-distribution-client/etc/sdcclientstore.jks | Bin 0 -> 907 bytes sdc-distribution-client/pom.xml | 81 ++- .../java/org/onap/sdc/api/IDistributionClient.java | 21 +- .../org/onap/sdc/api/asdc/RegistrationRequest.java | 56 -- .../org/onap/sdc/api/asdc/ServerListResponse.java | 35 -- .../org/onap/sdc/api/consumer/IConfiguration.java | 50 +- .../sdc/api/notification/INotificationData.java | 4 +- .../sdc/api/notification/IVfModuleMetadata.java | 2 +- .../src/main/java/org/onap/sdc/http/AsdcUrls.java | 34 -- .../java/org/onap/sdc/http/HttpAsdcClient.java | 177 ------ .../org/onap/sdc/http/HttpAsdcClientException.java | 27 - .../java/org/onap/sdc/http/HttpAsdcResponse.java | 71 --- .../java/org/onap/sdc/http/HttpClientFactory.java | 2 +- .../main/java/org/onap/sdc/http/HttpSdcClient.java | 177 ++++++ .../org/onap/sdc/http/HttpSdcClientException.java | 27 + .../java/org/onap/sdc/http/HttpSdcResponse.java | 71 +++ .../java/org/onap/sdc/http/IHttpAsdcClient.java | 35 -- .../java/org/onap/sdc/http/IHttpSdcClient.java | 35 ++ .../java/org/onap/sdc/http/SdcConnectorClient.java | 222 +++---- .../src/main/java/org/onap/sdc/http/SdcUrls.java | 30 + .../onap/sdc/http/TopicRegistrationResponse.java | 43 -- .../main/java/org/onap/sdc/impl/Configuration.java | 84 ++- .../org/onap/sdc/impl/ConfigurationValidator.java | 30 +- .../org/onap/sdc/impl/DistributionClientImpl.java | 482 ++++++--------- .../sdc/impl/DistributionClientResultImpl.java | 4 +- .../org/onap/sdc/impl/NotificationConsumer.java | 72 ++- .../java/org/onap/sdc/impl/StatusConsumer.java | 29 +- .../sdc/utils/DistributionActionResultEnum.java | 22 +- .../sdc/utils/DistributionClientConstants.java | 2 +- .../main/java/org/onap/sdc/utils/GeneralUtils.java | 20 - .../org/onap/sdc/utils/NotificationSender.java | 43 +- .../onap/sdc/utils/kafka/KafkaDataResponse.java | 35 ++ .../org/onap/sdc/utils/kafka/SdcKafkaConsumer.java | 100 ++++ .../org/onap/sdc/utils/kafka/SdcKafkaProducer.java | 98 +++ .../onap/sdc/api/asdc/RegistrationRequestTest.java | 45 -- .../onap/sdc/http/HttpAsdcClientResponseTest.java | 9 +- .../java/org/onap/sdc/http/HttpAsdcClientTest.java | 35 +- .../org/onap/sdc/http/HttpClientFactoryTest.java | 6 +- .../org/onap/sdc/http/SdcConnectorClientTest.java | 263 +++------ .../org/onap/sdc/impl/DistributionClientTest.java | 325 +++------- .../onap/sdc/impl/NotificationConsumerTest.java | 656 +++++++++------------ .../org/onap/sdc/utils/NotificationSenderTest.java | 94 +-- .../test/java/org/onap/sdc/utils/SdcKafkaTest.java | 104 ++++ .../java/org/onap/sdc/utils/TestConfiguration.java | 98 ++- .../src/test/resources/asdc-client.jks | Bin 1177 -> 0 bytes .../src/test/resources/jaas.conf | 20 + .../src/test/resources/log4j.xml | 26 + .../src/test/resources/sdc-client.jks | Bin 0 -> 1177 bytes version.properties | 6 +- 66 files changed, 2105 insertions(+), 2292 deletions(-) delete mode 100644 sdc-distribution-ci/etc/asdc-client.jks delete mode 100644 sdc-distribution-ci/etc/asdcclientstore.jks create mode 100644 sdc-distribution-ci/etc/sdc-client.jks create mode 100644 sdc-distribution-ci/etc/sdcclientstore.jks delete mode 100644 sdc-distribution-ci/src/main/java/org/onap/test/it/RegisterToAsdcTopicIT.java create mode 100644 sdc-distribution-ci/src/main/java/org/onap/test/it/RegisterToSdcTopicIT.java create mode 100644 sdc-distribution-ci/src/test/java/org/onap/test/core/service/CustomKafkaContainer.java create mode 100644 sdc-distribution-ci/src/test/resources/logback-test.xml delete mode 100644 sdc-distribution-client/etc/asdc-client.jks delete mode 100644 sdc-distribution-client/etc/asdcclientstore.jks create mode 100644 sdc-distribution-client/etc/sdc-client.jks create mode 100644 sdc-distribution-client/etc/sdcclientstore.jks delete mode 100644 sdc-distribution-client/src/main/java/org/onap/sdc/api/asdc/RegistrationRequest.java delete mode 100644 sdc-distribution-client/src/main/java/org/onap/sdc/api/asdc/ServerListResponse.java delete mode 100644 sdc-distribution-client/src/main/java/org/onap/sdc/http/AsdcUrls.java delete mode 100644 sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpAsdcClient.java delete mode 100644 sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpAsdcClientException.java delete mode 100644 sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpAsdcResponse.java create mode 100644 sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpSdcClient.java create mode 100644 sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpSdcClientException.java create mode 100644 sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpSdcResponse.java delete mode 100644 sdc-distribution-client/src/main/java/org/onap/sdc/http/IHttpAsdcClient.java create mode 100644 sdc-distribution-client/src/main/java/org/onap/sdc/http/IHttpSdcClient.java create mode 100644 sdc-distribution-client/src/main/java/org/onap/sdc/http/SdcUrls.java delete mode 100644 sdc-distribution-client/src/main/java/org/onap/sdc/http/TopicRegistrationResponse.java create mode 100644 sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/KafkaDataResponse.java create mode 100644 sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java create mode 100644 sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaProducer.java delete mode 100644 sdc-distribution-client/src/test/java/org/onap/sdc/api/asdc/RegistrationRequestTest.java create mode 100644 sdc-distribution-client/src/test/java/org/onap/sdc/utils/SdcKafkaTest.java delete mode 100644 sdc-distribution-client/src/test/resources/asdc-client.jks create mode 100644 sdc-distribution-client/src/test/resources/jaas.conf create mode 100644 sdc-distribution-client/src/test/resources/log4j.xml create mode 100644 sdc-distribution-client/src/test/resources/sdc-client.jks diff --git a/README.md b/README.md index 8a22140..f975823 100644 --- a/README.md +++ b/README.md @@ -23,18 +23,18 @@ Every client that wants to use the JAR, need to implement IConfiguration interfa Configuration parameters: -------------------------- -- AsdcAddress : ASDC Distribution Engine address. Value can be either hostname (with or without port), IP:port or FQDN (Fully Qualified Domain Name). -- User : User Name for ASDC distribution consumer authentication. -- Password : User Password for ASDC distribution consumer authentication. -- PollingInterval : Distribution Client Polling Interval towards UEB in seconds. Can Be reconfigured in runtime. -- PollingTimeout : Distribution Client Timeout in seconds waiting to UEB server response in each fetch interval. Can Be reconfigured in runtime. +- sdcAddress : SDC Distribution Engine address. Value can be either hostname (with or without port), IP:port or FQDN (Fully Qualified Domain Name). +- User : User Name for SDC distribution consumer authentication. +- Password : User Password for SDC distribution consumer authentication. +- PollingInterval : Distribution Client Polling Interval towards MessageBus in seconds. Can Be reconfigured in runtime. +- PollingTimeout : Distribution Client Timeout in seconds waiting to MessageBus server response in each fetch interval. Can Be reconfigured in runtime. - RelevantArtifactTypes : List of artifact types. If the service contains any of the artifacts in the list, the callback will be activated. Can Be reconfigured in runtime. - ConsumerGroup : Returns the consumer group defined for this ONAP component, if no consumer group is defined return null. -- EnvironmentName : Returns the environment name (testing, production etc... Can Be reconfigured in runtime. -- ConsumerID : Unique ID of ONAP component instance (e.x INSTAR name). -- KeyStorePath : Return full path to Client's Key Store that contains either CA certificate or the ASDC's public key (e.g /etc/keystore/asdc-client.jks). file will be deployed with asdc-distribution jar +- EnvironmentName : Returns the environment name (testing, production etc... Can Be reconfigured in runtime. +- ConsumerID : Unique ID of ONAP component instance (e.x INSTAR name). +- KeyStorePath : Return full path to Client's Key Store that contains either CA certificate or the SDC's public key (e.g /etc/keystore/sdc-client.jks). file will be deployed with sdc-distribution jar - KeyStorePassword : Return client's Key Store password. -- activateServerTLSAuth : Sets whether ASDC server TLS authentication is activated. If set to false, Key Store path and password are not needed to be set. +- activateServerTLSAuth : Sets whether SDC server TLS authentication is activated. If set to false, Key Store path and password are not needed to be set. - UseSystemProxy : If set to true, SDC Distribution Client will use system wide proxy configuration passed through JVM arguments. - HttpProxyHost : Optional config. If configured, SDC Distribution client will use this http proxy host with HTTP client. - HttpProxyPort : Mandatory if HttpProxyHost is configured. If configured, SDC Distribution client will use this https proxy port with HTTP client. @@ -48,16 +48,16 @@ package org.onap.conf; import java.util.ArrayList; import java.util.List; -import org.onap.asdc.api.consumer.IConfiguration; -import org.onap.asdc.utils.ArtifactTypeEnum; +import org.onap.sdc.api.consumer.IConfiguration; +import org.onap.sdc.utils.ArtifactTypeEnum; public class SimpleConfiguration implements IConfiguration{ int randomSeed; - String asdcAddress; + String sdcAddress; public SimpleConfiguration(){ randomSeed = ((int)(Math.random()*1000)); - asdcAddress = "127.0.0.1:8443"; + sdcAddress = "127.0.0.1:8443"; } public String getUser() { return "ci"; @@ -95,12 +95,12 @@ public class SimpleConfiguration implements IConfiguration{ return "unique-Consumer-Group"+randomSeed; } - public String getAsdcAddress() { - return asdcAddress; + public String getSdcAddress() { + return sdcAddress; } - public void setAsdcAddress(String asdcAddress) { - this.asdcAddress = asdcAddress; + public void setSdcAddress(String sdcAddress) { + this.sdcAddress = sdcAddress; } @Override public String getKeyStorePath() { diff --git a/pom.xml b/pom.xml index 9defce1..bb52ff0 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ org.onap.sdc.sdc-distribution-client sdc-main-distribution-client - 1.4.5-SNAPSHOT + 2.0.0-SNAPSHOT pom sdc-sdc-distribution-client @@ -21,7 +21,6 @@ - @@ -38,7 +37,7 @@ 4.5.13 4.4.15 1.2.11 - 5.9.0 + 5.7.2 1.30 31.1-jre 9.4.48.v20220622 @@ -71,7 +70,6 @@ 3.9.1 3.4.2 0.8.6 - 2.22.2 3.8.1 11 11 @@ -129,7 +127,6 @@ umlgraph ${umlgraph.version} - -views true @@ -140,7 +137,14 @@ maven-checkstyle-plugin - 2.17 + + + onap-java-style + + false + + + checkstyle-suppressions.xml checkstyle.suppressions.file diff --git a/sdc-distribution-ci/etc/asdc-client.jks b/sdc-distribution-ci/etc/asdc-client.jks deleted file mode 100644 index eb0a0d3..0000000 Binary files a/sdc-distribution-ci/etc/asdc-client.jks and /dev/null differ diff --git a/sdc-distribution-ci/etc/asdcclientstore.jks b/sdc-distribution-ci/etc/asdcclientstore.jks deleted file mode 100644 index 5dc006d..0000000 Binary files a/sdc-distribution-ci/etc/asdcclientstore.jks and /dev/null differ diff --git a/sdc-distribution-ci/etc/sdc-client.jks b/sdc-distribution-ci/etc/sdc-client.jks new file mode 100644 index 0000000..eb0a0d3 Binary files /dev/null and b/sdc-distribution-ci/etc/sdc-client.jks differ diff --git a/sdc-distribution-ci/etc/sdcclientstore.jks b/sdc-distribution-ci/etc/sdcclientstore.jks new file mode 100644 index 0000000..5dc006d Binary files /dev/null and b/sdc-distribution-ci/etc/sdcclientstore.jks differ diff --git a/sdc-distribution-ci/pom.xml b/sdc-distribution-ci/pom.xml index e11d428..707de83 100644 --- a/sdc-distribution-ci/pom.xml +++ b/sdc-distribution-ci/pom.xml @@ -7,7 +7,7 @@ org.onap.sdc.sdc-distribution-client sdc-main-distribution-client - 1.4.5-SNAPSHOT + 2.0.0-SNAPSHOT sdc-distribution-ci @@ -25,6 +25,13 @@ + + + org.testcontainers + kafka + ${testcontainers.version} + test + org.onap.sdc.sdc-distribution-client sdc-distribution-client @@ -136,6 +143,11 @@ + com.fasterxml.jackson.core + jackson-annotations + 2.8.2 + + org.mockito mockito-core ${mockito-core.version} @@ -181,6 +193,12 @@ ${httpclient.version} runtime + + org.junit-pioneer + junit-pioneer + 1.4.2 + test + @@ -195,7 +213,7 @@ true lib - org.onap.test.it.RegisterToAsdcTopicIT + org.onap.test.it.RegisterToSdcTopicIT lib/ diff --git a/sdc-distribution-ci/src/main/java/org/onap/test/core/config/DistributionClientConfig.java b/sdc-distribution-ci/src/main/java/org/onap/test/core/config/DistributionClientConfig.java index ad18969..5a902c8 100644 --- a/sdc-distribution-ci/src/main/java/org/onap/test/core/config/DistributionClientConfig.java +++ b/sdc-distribution-ci/src/main/java/org/onap/test/core/config/DistributionClientConfig.java @@ -26,21 +26,23 @@ import java.util.List; public class DistributionClientConfig implements IConfiguration { - public static final String DEFAULT_ASDC_ADDRESS = "localhost:30206"; + public static final String DEFAULT_SDC_ADDRESS = "localhost:30206"; public static final String DEFAULT_COMSUMER_ID = "dcae-openapi-manager"; public static final String DEFAULT_CONSUMER_GROUP = "noapp"; public static final String DEFAULT_ENVIRONMENT_NAME = "AUTO"; public static final String DEFAULT_PASSWORD = "Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U"; public static final int DEFAULT_POLLING_INTERVAL = 20; public static final int DEFAULT_POLLING_TIMEOUT = 20; + public static final String DEFAULT_STATUS_TOPIC = "STATUS-TOPIC"; + public static final String DEFAULT_NOTIF_TOPIC = "NOTIF-TOPIC"; public static final String DEFAULT_USER = "dcae"; - public static final String DEFAULT_KEY_STORE_PATH = "etc/asdc-client.jks"; + public static final String DEFAULT_KEY_STORE_PATH = "etc/sdc-client.jks"; public static final String DEFAULT_KEY_STORE_PASSWORD = "Aa123456"; public static final boolean DEFAULT_ACTIVATE_SERVER_TLS_AUTH = false; public static final boolean DEFAULT_IS_FILTER_IN_EMPTY_RESOURCES = true; public static final boolean DEFAULT_USE_HTTPS_WITH_SDC = false; - public static final String DEFAULT_MSG_BUS_ADDRESS = "localhost"; - private String asdcAddress; + public static final String DEFAULT_MSG_BUS_ADDRESS = "localhost:9092"; + private String sdcAddress; private String user; private String password; private int pollingInterval; @@ -53,38 +55,23 @@ public class DistributionClientConfig implements IConfiguration { private String keyStorePassword; private boolean activateServerTLSAuth; private boolean isFilterInEmptyResources; - private boolean useHttpsWithDmaap; private boolean useHttpsWithSDC; private List msgBusAddress; + private String sdcStatusTopicName; + private String sdcNotificationTopicName; + private String kafkaSecurityProtocolConfig; + private String kafkaSaslMechanism; + private String kafkaSaslJaasConfig; private String httpProxyHost; private int httpProxyPort; private String httpsProxyHost; private int httpsProxyPort; private boolean useSystemProxy; - public DistributionClientConfig(IConfiguration other) { - this.asdcAddress = other.getAsdcAddress(); - this.comsumerID = other.getConsumerID(); - this.consumerGroup = other.getConsumerGroup(); - this.environmentName = other.getEnvironmentName(); - this.password = other.getPassword(); - this.pollingInterval = other.getPollingInterval(); - this.pollingTimeout = other.getPollingTimeout(); - this.relevantArtifactTypes = other.getRelevantArtifactTypes(); - this.user = other.getUser(); - this.keyStorePath = other.getKeyStorePath(); - this.keyStorePassword = other.getKeyStorePassword(); - this.activateServerTLSAuth = other.activateServerTLSAuth(); - this.isFilterInEmptyResources = other.isFilterInEmptyResources(); - this.httpProxyHost = other.getHttpProxyHost(); - this.httpProxyPort = other.getHttpProxyPort(); - this.httpsProxyHost = other.getHttpsProxyHost(); - this.httpsProxyPort = other.getHttpsProxyPort(); - this.useSystemProxy = other.isUseSystemProxy(); - } - public DistributionClientConfig() { - this.asdcAddress = DEFAULT_ASDC_ADDRESS; + this.sdcAddress = DEFAULT_SDC_ADDRESS; + this.sdcStatusTopicName = DEFAULT_STATUS_TOPIC; + this.sdcNotificationTopicName = DEFAULT_NOTIF_TOPIC; this.comsumerID = DEFAULT_COMSUMER_ID; this.consumerGroup = DEFAULT_CONSUMER_GROUP; this.environmentName = DEFAULT_ENVIRONMENT_NAME; @@ -99,22 +86,23 @@ public class DistributionClientConfig implements IConfiguration { this.activateServerTLSAuth = DEFAULT_ACTIVATE_SERVER_TLS_AUTH; this.isFilterInEmptyResources = DEFAULT_IS_FILTER_IN_EMPTY_RESOURCES; this.useHttpsWithSDC = DEFAULT_USE_HTTPS_WITH_SDC; - msgBusAddress = new ArrayList<>(); - msgBusAddress.add(DEFAULT_MSG_BUS_ADDRESS); - msgBusAddress.add(DEFAULT_MSG_BUS_ADDRESS); - msgBusAddress.add(DEFAULT_MSG_BUS_ADDRESS); + this.msgBusAddress = new ArrayList<>(); + this.msgBusAddress.add(DEFAULT_MSG_BUS_ADDRESS); } @Override - public String getAsdcAddress() { - return asdcAddress; + public String getSdcAddress() { + return sdcAddress; } - @Override public List getMsgBusAddress() { return msgBusAddress; } + public void setMsgBusAddress(List msgBusAddress) { + this.msgBusAddress = msgBusAddress; + } + @Override public String getUser() { return user; @@ -173,8 +161,8 @@ public class DistributionClientConfig implements IConfiguration { this.comsumerID = comsumerID; } - public void setAsdcAddress(String asdcAddress) { - this.asdcAddress = asdcAddress; + public void setSdcAddress(String sdcAddress) { + this.sdcAddress = sdcAddress; } public void setUser(String user) { @@ -217,7 +205,7 @@ public class DistributionClientConfig implements IConfiguration { public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + ((asdcAddress == null) ? 0 : asdcAddress.hashCode()); + result = prime * result + ((sdcAddress == null) ? 0 : sdcAddress.hashCode()); result = prime * result + ((comsumerID == null) ? 0 : comsumerID.hashCode()); result = prime * result + ((consumerGroup == null) ? 0 : consumerGroup.hashCode()); result = prime * result + ((environmentName == null) ? 0 : environmentName.hashCode()); @@ -251,11 +239,11 @@ public class DistributionClientConfig implements IConfiguration { return false; } DistributionClientConfig other = (DistributionClientConfig) obj; - if (asdcAddress == null) { - if (other.asdcAddress != null) { + if (sdcAddress == null) { + if (other.sdcAddress != null) { return false; } - } else if (!asdcAddress.equals(other.asdcAddress)) { + } else if (!sdcAddress.equals(other.sdcAddress)) { return false; } if (comsumerID == null) { @@ -322,7 +310,7 @@ public class DistributionClientConfig implements IConfiguration { @Override public String toString() { - return "TestConfiguration [asdcAddress=" + asdcAddress + ", user=" + user + ", password=" + password + return "TestConfiguration [sdcAddress=" + sdcAddress + ", user=" + user + ", password=" + password + ", pollingInterval=" + pollingInterval + ", pollingTimeout=" + pollingTimeout + ", relevantArtifactTypes=" + relevantArtifactTypes + ", consumerGroup=" + consumerGroup + ", environmentName=" + environmentName + ", comsumerID=" + comsumerID + "]"; @@ -337,11 +325,6 @@ public class DistributionClientConfig implements IConfiguration { this.isFilterInEmptyResources = isFilterInEmptyResources; } - @Override - public Boolean isUseHttpsWithDmaap() { - return this.useHttpsWithDmaap; - } - public Boolean isUseHttpsWithSDC() { return this.useHttpsWithSDC; } diff --git a/sdc-distribution-ci/src/main/java/org/onap/test/core/service/ClientNotifyCallback.java b/sdc-distribution-ci/src/main/java/org/onap/test/core/service/ClientNotifyCallback.java index 4dfe388..4cee4cf 100644 --- a/sdc-distribution-ci/src/main/java/org/onap/test/core/service/ClientNotifyCallback.java +++ b/sdc-distribution-ci/src/main/java/org/onap/test/core/service/ClientNotifyCallback.java @@ -23,7 +23,7 @@ import org.onap.sdc.api.consumer.IDistributionStatusMessage; import org.onap.sdc.api.consumer.INotificationCallback; import org.onap.sdc.api.notification.INotificationData; import org.onap.sdc.api.notification.IResourceInstance; -import org.onap.sdc.http.HttpAsdcClient; +import org.onap.sdc.http.HttpSdcClient; import org.onap.sdc.http.SdcConnectorClient; import org.onap.sdc.impl.DistributionClientDownloadResultImpl; import org.onap.sdc.impl.DistributionClientImpl; @@ -46,8 +46,8 @@ public class ClientNotifyCallback implements INotificationCallback { private final DistributionClientImpl distributionClient; private final List pulledArtifacts = new ArrayList<>(); DistributionClientConfig config = new DistributionClientConfig(); - HttpAsdcClient asdcClient = new HttpAsdcClient(config); - SdcConnectorClient sdcConnectorClient = new SdcConnectorClient(config, asdcClient); + HttpSdcClient sdcClient = new HttpSdcClient(config); + SdcConnectorClient sdcConnectorClient = new SdcConnectorClient(config, sdcClient); ArtifactsDownloader artifactsDownloader = new ArtifactsDownloader("/app/path", sdcConnectorClient); public List getPulledArtifacts() { diff --git a/sdc-distribution-ci/src/main/java/org/onap/test/it/RegisterToAsdcTopicIT.java b/sdc-distribution-ci/src/main/java/org/onap/test/it/RegisterToAsdcTopicIT.java deleted file mode 100644 index 58baec7..0000000 --- a/sdc-distribution-ci/src/main/java/org/onap/test/it/RegisterToAsdcTopicIT.java +++ /dev/null @@ -1,51 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * sdc-distribution-client - * ================================================================================ - * Copyright (C) 2020 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.test.it; - -import org.onap.sdc.impl.DistributionClientImpl; -import org.onap.test.core.config.DistributionClientConfig; -import org.onap.test.core.service.ArtifactsValidator; -import org.onap.test.core.service.ClientInitializer; -import org.onap.test.core.service.ClientNotifyCallback; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; - -public class RegisterToAsdcTopicIT { - - - public static void main(String[] args) { - DistributionClientConfig clientConfig = new DistributionClientConfig(); - List validators = new ArrayList<>(); - DistributionClientImpl client = new DistributionClientImpl(); - ClientNotifyCallback callback = new ClientNotifyCallback(validators, client); - ClientInitializer clientInitializer = new ClientInitializer(clientConfig, callback, client); - clientInitializer.initialize(); - Runtime.getRuntime().addShutdownHook(new Thread() { - public void run() { - client.stop(); - System.out.println("Shutdown Hook is running !"); - } - }); - - } -} diff --git a/sdc-distribution-ci/src/main/java/org/onap/test/it/RegisterToSdcTopicIT.java b/sdc-distribution-ci/src/main/java/org/onap/test/it/RegisterToSdcTopicIT.java new file mode 100644 index 0000000..c89ecd4 --- /dev/null +++ b/sdc-distribution-ci/src/main/java/org/onap/test/it/RegisterToSdcTopicIT.java @@ -0,0 +1,46 @@ +/*- + * ============LICENSE_START======================================================= + * sdc-distribution-client + * ================================================================================ + * Copyright (C) 2020 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.test.it; + +import java.util.ArrayList; +import java.util.List; +import org.onap.sdc.impl.DistributionClientImpl; +import org.onap.test.core.config.DistributionClientConfig; +import org.onap.test.core.service.ArtifactsValidator; +import org.onap.test.core.service.ClientInitializer; +import org.onap.test.core.service.ClientNotifyCallback; + +public class RegisterToSdcTopicIT { + + + public static void main(String[] args) { + DistributionClientConfig clientConfig = new DistributionClientConfig(); + List validators = new ArrayList<>(); + DistributionClientImpl client = new DistributionClientImpl(); + ClientNotifyCallback callback = new ClientNotifyCallback(validators, client); + ClientInitializer clientInitializer = new ClientInitializer(clientConfig, callback, client); + clientInitializer.initialize(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + client.stop(); + System.out.println("Shutdown Hook is running !"); + })); + + } +} diff --git a/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java b/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java index ba118c0..1abef1f 100644 --- a/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java +++ b/sdc-distribution-ci/src/test/java/org/onap/test/core/service/ClientInitializerTest.java @@ -24,18 +24,35 @@ import static org.awaitility.Awaitility.await; import static org.mockito.Mockito.verify; import java.io.IOException; -import java.net.URI; -import java.net.http.HttpClient; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; -import java.nio.file.Paths; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; -import org.awaitility.Durations; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import lombok.SneakyThrows; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.config.SaslConfigs; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junitpioneer.jupiter.SetEnvironmentVariable; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Mockito; @@ -46,84 +63,52 @@ import org.onap.test.core.config.DistributionClientConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.WaitStrategy; +import org.testcontainers.containers.wait.strategy.WaitStrategyTarget; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; - +import org.testcontainers.shaded.org.awaitility.Durations; +import org.testcontainers.utility.DockerImageName; @Testcontainers @ExtendWith(MockitoExtension.class) +@SetEnvironmentVariable(key = "SASL_JAAS_CONFIG", value = "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret';") +@SetEnvironmentVariable(key = "SECURITY_PROTOCOL", value = "SASL_PLAINTEXT") +@SetEnvironmentVariable(key = "SASL_MECHANISM", value = "PLAIN") class ClientInitializerTest { - private static final int SUCCESSFUL_STOP_MSG_INDEX = 2; - private static final int SUCCESSFUL_UNREGISTER_MSG_INDEX = 3; - private static final int SUCCESSFUL_INIT_MSG_INDEX = 0; - private static final int SUCCESSFUL_DIST_MSG_INDEX = 3; private static final int EXPECTED_HEAT_ARTIFACTS = 4; + private static final DistributionClientConfig clientConfig = new DistributionClientConfig(); private static final Logger testLog = LoggerFactory.getLogger(ClientInitializerTest.class); @Container - public GenericContainer mockDmaap = new GenericContainer("registry.gitlab.com/orange-opensource/lfn/onap/mock_servers/mock-dmaap:latest") - .withNetworkMode("host"); + CustomKafkaContainer kafka = buildBrokerInstance(); @Container - public GenericContainer mockSdc = new GenericContainer("registry.gitlab.com/orange-opensource/lfn/onap/mock_servers/mock-sdc:latest") - .withNetworkMode("host"); - @Mock - private Logger log; + public GenericContainer mockSdc = + new GenericContainer<>( + "registry.gitlab.com/orange-opensource/lfn/onap/mock_servers/mock-sdc:develop") + .withExposedPorts(30206); @Mock private Logger distClientLog; private ClientInitializer clientInitializer; private ClientNotifyCallback clientNotifyCallback; @BeforeEach - public void initializeClient() { - DistributionClientConfig clientConfig = new DistributionClientConfig(); + public void initializeClient() throws InterruptedException { + clientConfig.setSdcAddress(mockSdc.getHost()+":"+mockSdc.getFirstMappedPort()); List validators = new ArrayList<>(); DistributionClientImpl client = new DistributionClientImpl(distClientLog); clientNotifyCallback = new ClientNotifyCallback(validators, client); clientInitializer = new ClientInitializer(clientConfig, clientNotifyCallback, client); - } - - @Test - void shouldRegisterToDmaapAfterClientInitialization() { - //given - final ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(String.class); - //when - clientInitializer.log = log; - clientInitializer.initialize(); - verify(log, Mockito.atLeastOnce()).info(exceptionCaptor.capture()); - List allValues = exceptionCaptor.getAllValues(); - //then - assertThat(allValues.get(SUCCESSFUL_INIT_MSG_INDEX)).isEqualTo("distribution client initialized successfuly"); - assertThat(allValues.get(SUCCESSFUL_DIST_MSG_INDEX)).isEqualTo("distribution client started successfuly"); - } - - @Test - void shouldUnregisterAndStopClient() { - //given - final ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(String.class); - //when clientInitializer.initialize(); - clientInitializer.stop(); - verify(distClientLog, Mockito.atLeastOnce()).info(exceptionCaptor.capture()); - List allValues = exceptionCaptor.getAllValues(); - //then - assertThat(allValues.get(SUCCESSFUL_STOP_MSG_INDEX)).isEqualTo("stop DistributionClient"); - assertThat(allValues.get(SUCCESSFUL_UNREGISTER_MSG_INDEX)).isEqualTo("client unregistered from topics successfully"); + Thread.sleep(1000); + setUpTopicsAndSendData(); } @Test - void shouldDownloadArtifactsWithArtifactTypeHeat() throws IOException, InterruptedException { - - //given - HttpRequest request = HttpRequest.newBuilder() - .uri(URI.create("http://localhost:3904/events/testName/add")) - .POST(HttpRequest.BodyPublishers.ofFile(Paths.get("src/test/resources/artifacts.json"))) - .build(); - HttpClient.newHttpClient().send(request, HttpResponse.BodyHandlers.ofString()); - //when - clientInitializer.initialize(); + void shouldDownloadArtifactsWithArtifactTypeHeat() { waitForArtifacts(); List calls = clientNotifyCallback.getPulledArtifacts(); - //then Assertions.assertEquals(EXPECTED_HEAT_ARTIFACTS, calls.size()); + clientInitializer.stop(); } private void waitForArtifacts() { @@ -132,4 +117,36 @@ class ClientInitializerTest { .atMost(Durations.ONE_MINUTE) .until(() -> !clientNotifyCallback.getPulledArtifacts().isEmpty()); } + + private CustomKafkaContainer buildBrokerInstance() { + final Map env = new LinkedHashMap<>(); + env.put("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:SASL_PLAINTEXT"); + env.put("KAFKA_SASL_ENABLED_MECHANISMS", "PLAIN"); + env.put("KAFKA_LISTENER_NAME_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG", "org.apache.kafka.common.security.plain.PlainLoginModule required " + + "username=\"admin\" " + + "password=\"admin-secret\" " + + "user_admin=\"admin-secret\" " + + "user_producer=\"producer-secret\" " + + "user_consumer=\"consumer-secret\";"); + + return new CustomKafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")) + .withEmbeddedZookeeper() + .withStartupAttempts(1) + .withEnv(env) + .withFixedExposedPort(43219, 9093); + } + + @SneakyThrows + private void setUpTopicsAndSendData() { + Properties props = new Properties(); + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); + props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret';"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + KafkaProducer producer = new KafkaProducer<>(props); + String content = Files.readString(Path.of("src/test/resources/artifacts.json")); + producer.send(new ProducerRecord<>("SDC-DIST-NOTIF-TOPIC", "testcontainers", content)).get(); + } } diff --git a/sdc-distribution-ci/src/test/java/org/onap/test/core/service/CustomKafkaContainer.java b/sdc-distribution-ci/src/test/java/org/onap/test/core/service/CustomKafkaContainer.java new file mode 100644 index 0000000..e2eabc1 --- /dev/null +++ b/sdc-distribution-ci/src/test/java/org/onap/test/core/service/CustomKafkaContainer.java @@ -0,0 +1,94 @@ +/*- + * ============LICENSE_START======================================================= + * sdc-distribution-client + * ================================================================================ + * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.test.core.service; + +import com.github.dockerjava.api.command.InspectContainerResponse; +import lombok.SneakyThrows; +import org.testcontainers.containers.FixedHostPortGenericContainer; +import org.testcontainers.utility.DockerImageName; + +public class CustomKafkaContainer extends FixedHostPortGenericContainer { + protected String externalZookeeperConnect; + + public CustomKafkaContainer(DockerImageName dockerImageName) { + super(String.valueOf(dockerImageName)); + this.externalZookeeperConnect = null; + this.withExposedPorts(9093); + this.withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092"); + this.withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT"); + this.withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER"); + this.withEnv("KAFKA_BROKER_ID", "1"); + this.withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1"); + this.withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1"); + this.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1"); + this.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1"); + this.withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", "9223372036854775807"); + this.withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0"); + } + + public CustomKafkaContainer withEmbeddedZookeeper() { + this.externalZookeeperConnect = null; + return this.self(); + } + + public String getBootstrapServers() { + return String.format("PLAINTEXT://%s:%s", this.getHost(), this.getMappedPort(9093)); + } + + protected void configure() { + this.withEnv("KAFKA_ADVERTISED_LISTENERS", String.format("BROKER://%s:9092", this.getNetwork() != null ? this.getNetworkAliases().get(0) : "localhost")); + String command = ""; + if (this.externalZookeeperConnect != null) { + this.withEnv("KAFKA_ZOOKEEPER_CONNECT", this.externalZookeeperConnect); + } else { + this.addExposedPort(2181); + this.withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:2181"); + command = command + "echo 'clientPort=2181' > zookeeper.properties\n"; + command = command + "echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties\n"; + command = command + "echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties\n"; + command = command + "zookeeper-server-start zookeeper.properties &\n"; + } + + command = command + "echo '' > /etc/confluent/docker/ensure \n"; + command = command + "/etc/confluent/docker/run \n"; + this.withCommand("sh", "-c", command); + } + + @SneakyThrows + protected void containerIsStarted(InspectContainerResponse containerInfo) { + try { + String brokerAdvertisedListener = this.brokerAdvertisedListener(containerInfo); + ExecResult result = this.execInContainer("kafka-configs", "--alter", "--bootstrap-server", + brokerAdvertisedListener, "--entity-type", "brokers", "--entity-name", + this.getEnvMap().get("KAFKA_BROKER_ID"), "--add-config", + "advertised.listeners=[" + String.join(",", this.getBootstrapServers(), brokerAdvertisedListener) + "]"); + if (result.getExitCode() != 0) { + throw new IllegalStateException(result.toString()); + } + } catch (Throwable var4) { + throw var4; + } + } + + protected String brokerAdvertisedListener(InspectContainerResponse containerInfo) { + return String.format("BROKER://%s:%s", containerInfo.getConfig().getHostName(), "9092"); + } +} diff --git a/sdc-distribution-ci/src/test/resources/logback-test.xml b/sdc-distribution-ci/src/test/resources/logback-test.xml new file mode 100644 index 0000000..78c5b05 --- /dev/null +++ b/sdc-distribution-ci/src/test/resources/logback-test.xml @@ -0,0 +1,14 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger - %msg%n + + + + + + + + + + \ No newline at end of file diff --git a/sdc-distribution-client/etc/asdc-client.jks b/sdc-distribution-client/etc/asdc-client.jks deleted file mode 100644 index eb0a0d3..0000000 Binary files a/sdc-distribution-client/etc/asdc-client.jks and /dev/null differ diff --git a/sdc-distribution-client/etc/asdcclientstore.jks b/sdc-distribution-client/etc/asdcclientstore.jks deleted file mode 100644 index 5dc006d..0000000 Binary files a/sdc-distribution-client/etc/asdcclientstore.jks and /dev/null differ diff --git a/sdc-distribution-client/etc/sdc-client.jks b/sdc-distribution-client/etc/sdc-client.jks new file mode 100644 index 0000000..eb0a0d3 Binary files /dev/null and b/sdc-distribution-client/etc/sdc-client.jks differ diff --git a/sdc-distribution-client/etc/sdcclientstore.jks b/sdc-distribution-client/etc/sdcclientstore.jks new file mode 100644 index 0000000..5dc006d Binary files /dev/null and b/sdc-distribution-client/etc/sdcclientstore.jks differ diff --git a/sdc-distribution-client/pom.xml b/sdc-distribution-client/pom.xml index ef0911c..3d1ebbd 100644 --- a/sdc-distribution-client/pom.xml +++ b/sdc-distribution-client/pom.xml @@ -3,17 +3,10 @@ 4.0.0 - - 2.8.0 - 2.8.9 - 0.0.1 - 1.18.24 - - org.onap.sdc.sdc-distribution-client sdc-main-distribution-client - 1.4.5-SNAPSHOT + 2.0.0-SNAPSHOT sdc-distribution-client @@ -21,7 +14,38 @@ Distribution client JAR file to use by consumers jar + + 3.18.1 + 3.6.28 + 2.8.0 + 1.7.30 + 2.5.2 + 3.3.1 + 2.8.9 + 1.18.24 + + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + com.fasterxml.jackson.core + jackson-core + ${jackson.version} + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + + + com.fasterxml.jackson.core + jackson-annotations + ${jackson.version} + org.projectlombok lombok @@ -36,13 +60,6 @@ org.slf4j slf4j-api ${slf4j-api.version} - - - - com.att.nsa - cambriaClient - ${cambriaClient.version} - compile org.json @@ -128,7 +145,6 @@ ${jetty.version} - org.eclipse.jetty jetty-webapp @@ -141,7 +157,6 @@ - org.junit.jupiter junit-jupiter @@ -161,13 +176,18 @@ ${mockito.version} test + + org.junit-pioneer + junit-pioneer + 1.4.2 + test + org.mockito mockito-core ${mockito.version} test - com.google.code.bean-matchers bean-matchers @@ -180,26 +200,35 @@ - org.assertj assertj-core ${assertj-core.version} test - - org.awaitility - awaitility - ${awaitility.version} + io.github.hakky54 + logcaptor + 2.7.10 test + + + com.salesforce.kafka.test + kafka-junit5 + 3.2.4 - objenesis - org.objenesis + org.apache.kafka + kafka-streams + test + + + org.apache.kafka + kafka_2.13 + ${kafka.version} + test - diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/api/IDistributionClient.java b/sdc-distribution-client/src/main/java/org/onap/sdc/api/IDistributionClient.java index fe37dc1..ac48419 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/api/IDistributionClient.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/api/IDistributionClient.java @@ -33,6 +33,11 @@ import org.onap.sdc.api.results.IDistributionClientResult; import org.onap.sdc.api.notification.IArtifactInfo; import org.onap.sdc.api.notification.IVfModuleMetadata; +/** + Client for sending/receiving notifications/status related to distributions from SDC. + This client uses Kafka for communication with the topics. + For communication using DMAAP MR use latest version with major version = 1 (e.g. 1.4.5) + **/ public interface IDistributionClient { /** @@ -63,15 +68,13 @@ public interface IDistributionClient { /** * Stop distribution client
* - stop polling notification topic
- * - unregister topics (via ASDC)
- * - delete keys from UEB * * @return IDistributionClientResult */ IDistributionClientResult stop(); /** - * Downloads an artifact from ASDC Catalog
+ * Downloads an artifact from SDC Catalog
* * @param artifactInfo * @return IDistributionClientDownloadResult @@ -80,10 +83,10 @@ public interface IDistributionClient { /** * Initialize the distribution client
- * - fetch the UEB server list from ASDC
- * - create keys in UEB
- * - register for topics (via ASDC)
+ * - get MessageBus server list from configuration
+ * - validate artifact types against sdc server
* - set the notification callback
+ * - set up notification sender
*

* Note: all configuration fields are mandatory.
* Password must be in clear text and not encrypted
@@ -98,10 +101,10 @@ public interface IDistributionClient { /** * Initialize the distribution client
- * - fetch the UEB server list from ASDC
- * - create keys in UEB
- * - register for topics (via ASDC)
+ * - get MessageBus server list from configuration
+ * - validate artifact types against sdc server
* - set the notification callback
+ * - set up notification sender
*

* Note: all configuration fields are mandatory.
* Password must be in clear text and not encrypted
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/api/asdc/RegistrationRequest.java b/sdc-distribution-client/src/main/java/org/onap/sdc/api/asdc/RegistrationRequest.java deleted file mode 100644 index 765da4c..0000000 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/api/asdc/RegistrationRequest.java +++ /dev/null @@ -1,56 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * sdc-distribution-client - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.sdc.api.asdc; - -import java.util.List; - -public class RegistrationRequest { - - private String apiPublicKey; - private String distrEnvName; - private Boolean isConsumerToSdcDistrStatusTopic; - private List distEnvEndPoints; - - public RegistrationRequest(String apiPublicKey, String distrEnvName, boolean isConsumerToSdcDistrStatusTopic, List distEnvEndPoints) { - this.apiPublicKey = apiPublicKey; - this.distrEnvName = distrEnvName; - this.isConsumerToSdcDistrStatusTopic = isConsumerToSdcDistrStatusTopic; - this.distEnvEndPoints = distEnvEndPoints; - } - - public String getApiPublicKey() { - return apiPublicKey; - } - - public String getDistrEnvName() { - return distrEnvName; - } - - public Boolean getIsConsumerToSdcDistrStatusTopic() { - return isConsumerToSdcDistrStatusTopic; - } - - public List getDistEnvEndPoints() { - return distEnvEndPoints; - } - - -} diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/api/asdc/ServerListResponse.java b/sdc-distribution-client/src/main/java/org/onap/sdc/api/asdc/ServerListResponse.java deleted file mode 100644 index b0cfb1e..0000000 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/api/asdc/ServerListResponse.java +++ /dev/null @@ -1,35 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * sdc-distribution-client - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.sdc.api.asdc; - -import java.util.List; -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.Setter; - -@Getter -@Setter -@NoArgsConstructor -public class ServerListResponse { - - private List uebServerList; - -} diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/api/consumer/IConfiguration.java b/sdc-distribution-client/src/main/java/org/onap/sdc/api/consumer/IConfiguration.java index e3013e2..84eb42b 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/api/consumer/IConfiguration.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/api/consumer/IConfiguration.java @@ -22,6 +22,7 @@ package org.onap.sdc.api.consumer; import java.util.List; +import org.apache.kafka.common.KafkaException; import org.onap.sdc.api.notification.INotificationData; public interface IConfiguration { @@ -30,7 +31,7 @@ public interface IConfiguration { * without port), IP:port or FQDN (Fully Qualified Domain Name). * @return SDC * Distribution Engine address. */ - String getAsdcAddress(); + String getSdcAddress(); /** * SDC Distribution Addresses from ONAP Component Values need to be set from @@ -38,6 +39,32 @@ public interface IConfiguration { */ List getMsgBusAddress(); + /** + * Kafka security.protocol + */ + default String getKafkaSecurityProtocolConfig() { + return System.getenv().getOrDefault("SECURITY_PROTOCOL", "SASL_PLAINTEXT"); + } + + /** + * Kafka sasl.mechanism + */ + default String getKafkaSaslMechanism() { + return System.getenv().getOrDefault("SASL_MECHANISM", "SCRAM-SHA-512"); + } + + /** + * Kafka sasl.jaas.config + */ + default String getKafkaSaslJaasConfig() { + String saslJaasConfFromEnv = System.getenv("SASL_JAAS_CONFIG"); + if(saslJaasConfFromEnv != null) { + return saslJaasConfFromEnv; + } else { + throw new KafkaException("sasl.jaas.config not set for Kafka Consumer"); + } + } + /** * User Name for SDC distribution consumer authentication. * @@ -64,7 +91,7 @@ public interface IConfiguration { String getPassword(); /** - * Distribution Client Polling Interval towards UEB in seconds. Can Be + * Distribution Client Polling Interval towards messaging bus in seconds. Can Be * reconfigured in runtime. * * @return Distribution Client Polling Interval. @@ -72,7 +99,7 @@ public interface IConfiguration { int getPollingInterval(); /** - * Distribution Client Timeout in seconds waiting to UEB server response in each + * Distribution Client Timeout in seconds waiting for messaging bus server response in each * fetch interval. Can Be reconfigured in runtime. * * @return Distribution Client Timeout in seconds. @@ -89,10 +116,10 @@ public interface IConfiguration { List getRelevantArtifactTypes(); /** - * Returns the consumer group defined for this ECOMP component, if no consumer + * Returns the consumer group defined for this component, if no consumer * group is defined return null. * - * @return Consumer group. + * @return SdcKafkaConsumer group. */ String getConsumerGroup(); @@ -105,7 +132,7 @@ public interface IConfiguration { String getEnvironmentName(); /** - * Unique ID of ECOMP component instance (e.x INSTAR name). + * Unique ID of component instance (e.x INSTAR name). * * @return */ @@ -113,7 +140,7 @@ public interface IConfiguration { /** * Return full path to Client's Key Store that contains either CA certificate or - * the ASDC's public key (e.g /etc/keystore/asdc-client.jks) file will be + * the SDC's public key (e.g /etc/keystore/sdc-client.jks) file will be * deployed with sdc-distribution jar. * * @return @@ -146,15 +173,6 @@ public interface IConfiguration { */ boolean isFilterInEmptyResources(); - /** - * By default, Distribution Client will use HTTPS (TLS 1.2) when connecting to - * DMAAP. This param can be null, then default (HTTPS) behavior will be applied. - * If set to false, distribution client will use HTTP when connecting to DMAAP. - * - * @return - */ - Boolean isUseHttpsWithDmaap(); - /** * By default, (false value) Distribution Client will trigger the regular * registration towards SDC (register component as consumer to the diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/api/notification/INotificationData.java b/sdc-distribution-client/src/main/java/org/onap/sdc/api/notification/INotificationData.java index 5df130a..8c24ed4 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/api/notification/INotificationData.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/api/notification/INotificationData.java @@ -25,7 +25,7 @@ import java.util.List; public interface INotificationData { /** - * Global Distribution Identifier: UUID generated by ASDC per each distribution activation.
+ * Global Distribution Identifier: UUID generated by SDC per each distribution activation.
* Generated UUID is compliant with RFC 4122.
* It is a 128-bit value formatted into blocks of hexadecimal digits separated by a hyphen ("-").
* Ex.: AA97B177-9383-4934-8543-0F91A7A02836 @@ -45,7 +45,7 @@ public interface INotificationData { String getServiceVersion(); /** - * Global UUID generated by ASDC per each service version. Generated UUID is compliant with RFC 4122.
+ * Global UUID generated by SDC per each service version. Generated UUID is compliant with RFC 4122.
* It is a 128-bit value formatted into blocks of hexadecimal digits separated by a hyphen ("-").
* Ex. : AA97B177-9383-4934-8543-0F91A7A02836 */ diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/api/notification/IVfModuleMetadata.java b/sdc-distribution-client/src/main/java/org/onap/sdc/api/notification/IVfModuleMetadata.java index 5347189..91415ce 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/api/notification/IVfModuleMetadata.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/api/notification/IVfModuleMetadata.java @@ -45,7 +45,7 @@ public interface IVfModuleMetadata { /** * Global UUID of the VF Module.
- * It is generated by ASDC per each new VF module version. Generated UUID is compliant with RFC 4122. It is a 128-bit value formatted into blocks of hexadecimal digits separated by a hyphen ("-").
+ * It is generated by SDC per each new VF module version. Generated UUID is compliant with RFC 4122. It is a 128-bit value formatted into blocks of hexadecimal digits separated by a hyphen ("-").
* Ex.: AA97B177-9383-4934-8543-0F91A7A02836 */ String getVfModuleModelUUID(); diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/http/AsdcUrls.java b/sdc-distribution-client/src/main/java/org/onap/sdc/http/AsdcUrls.java deleted file mode 100644 index b1d51d9..0000000 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/http/AsdcUrls.java +++ /dev/null @@ -1,34 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * sdc-distribution-client - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.sdc.http; - -public class AsdcUrls { - - private AsdcUrls() { - - } - - public static final String GET_CLUSTER_SERVER_LIST = "/sdc/v1/distributionUebCluster"; - public static final String GET_VALID_ARTIFACT_TYPES = "/sdc/v1/artifactTypes"; - public static final String POST_FOR_TOPIC_REGISTRATION = "/sdc/v1/registerForDistribution"; - public static final String POST_FOR_UNREGISTER = "/sdc/v1/unRegisterForDistribution"; - -} diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpAsdcClient.java b/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpAsdcClient.java deleted file mode 100644 index 14c9c7f..0000000 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpAsdcClient.java +++ /dev/null @@ -1,177 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * sdc-distribution-client - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * Modifications copyright (C) 2020 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.sdc.http; - -import org.apache.http.Header; -import org.apache.http.HttpEntity; -import org.apache.http.HttpStatus; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.StringEntity; -import org.apache.http.impl.client.CloseableHttpClient; -import org.onap.sdc.api.consumer.IConfiguration; -import org.onap.sdc.utils.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.ConnectException; -import java.net.UnknownHostException; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.Map; - -public class HttpAsdcClient implements IHttpAsdcClient { - - private static final Logger log = LoggerFactory.getLogger(HttpAsdcClient.class.getName()); - private static final boolean ALWAYS_CLOSE_THE_REQUEST_CONNECTION = true; - private final CloseableHttpClient httpClient; - private final String httpSchema; - private final String serverFqdn; - private final HttpRequestFactory httpRequestFactory; - - /** - * Constructor - * - * @deprecated - * This constructor will be removed in the future. - * - * @param configuration Asdc client configuration - */ - @Deprecated - public HttpAsdcClient(IConfiguration configuration) { - this(configuration.getAsdcAddress(), - new HttpClientFactory(configuration), - new HttpRequestFactory(configuration.getUser(), configuration.getPassword()) - ); - } - - public HttpAsdcClient(String asdcAddress, HttpClientFactory httpClientFactory, HttpRequestFactory httpRequestFactory) { - this.serverFqdn = asdcAddress; - this.httpRequestFactory = httpRequestFactory; - - Pair httpClientPair = httpClientFactory.createInstance(); - this.httpSchema = httpClientPair.getFirst(); - this.httpClient = httpClientPair.getSecond(); - } - - public HttpAsdcResponse postRequest(String requestUrl, HttpEntity entity, Map headersMap) { - return postRequest(requestUrl, entity, headersMap, ALWAYS_CLOSE_THE_REQUEST_CONNECTION).getFirst(); - } - - public Pair postRequest(String requestUrl, HttpEntity entity, Map headersMap, boolean closeTheRequest) { - Pair ret; - final String url = resolveUrl(requestUrl); - log.debug("url to send {}", url); - HttpPost httpPost = httpRequestFactory.createHttpPostRequest(url, headersMap, entity); - - CloseableHttpResponse httpResponse = null; - HttpAsdcResponse response = null; - try { - httpResponse = httpClient.execute(httpPost); - response = new HttpAsdcResponse(httpResponse.getStatusLine().getStatusCode(), httpResponse.getEntity()); - } catch (IOException e) { - log.error("failed to send request to url: {}", requestUrl); - response = createHttpResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR, "failed to send request"); - } finally { - ret = finalizeHttpRequest(closeTheRequest, httpResponse, response); - } - - return ret; - } - - public HttpAsdcResponse getRequest(String requestUrl, Map headersMap) { - return getRequest(requestUrl, headersMap, ALWAYS_CLOSE_THE_REQUEST_CONNECTION).getFirst(); - } - - public Pair getRequest(String requestUrl, Map headersMap, boolean closeTheRequest) { - Pair ret; - - final String url = resolveUrl(requestUrl); - log.debug("url to send {}", url); - HttpGet httpGet = httpRequestFactory.createHttpGetRequest(url, headersMap); - - CloseableHttpResponse httpResponse = null; - HttpAsdcResponse response = null; - try { - httpResponse = httpClient.execute(httpGet); - - log.debug("GET Response Status {}", httpResponse.getStatusLine().getStatusCode()); - Header[] headersRes = httpResponse.getAllHeaders(); - Map headersResMap = new HashMap<>(); - for (Header header : headersRes) { - headersResMap.put(header.getName(), header.getValue()); - } - response = new HttpAsdcResponse(httpResponse.getStatusLine().getStatusCode(), httpResponse.getEntity(), headersResMap); - - } catch (UnknownHostException | ConnectException e) { - log.error("failed to connect to url: {}", requestUrl, e); - response = createHttpResponse(HttpStatus.SC_BAD_GATEWAY, "failed to connect"); - } catch (IOException e) { - log.error("failed to send request to url: {} error {}", requestUrl, e.getMessage()); - response = createHttpResponse(HttpStatus.SC_BAD_GATEWAY, "failed to send request " + e.getMessage()); - } finally { - ret = finalizeHttpRequest(closeTheRequest, httpResponse, response); - } - - return ret; - } - - String getHttpSchema(){ - return this.httpSchema; - } - - private String resolveUrl(String requestUrl) { - return this.httpSchema + serverFqdn + requestUrl; - } - - private Pair finalizeHttpRequest(boolean closeTheRequest, CloseableHttpResponse httpResponse, HttpAsdcResponse response) { - Pair ret; - if (closeTheRequest) { - if (httpResponse != null) { - try { - httpResponse.close(); - } catch (IOException e) { - log.error("failed to close http response"); - } - } - ret = new Pair<>(response, null); - } else { - ret = new Pair<>(response, httpResponse); - } - - return ret; - } - - static HttpAsdcResponse createHttpResponse(int httpStatusCode, String httpMessage) { - return new HttpAsdcResponse(httpStatusCode, new StringEntity(httpMessage, StandardCharsets.UTF_8)); - } - - public void closeHttpClient() { - try { - httpClient.close(); - } catch (IOException e) { - log.error("failed to close http client"); - } - } -} diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpAsdcClientException.java b/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpAsdcClientException.java deleted file mode 100644 index 8d6a527..0000000 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpAsdcClientException.java +++ /dev/null @@ -1,27 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * sdc-distribution-client - * ================================================================================ - * Copyright (C) 2020 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.sdc.http; - -public class HttpAsdcClientException extends RuntimeException { - - public HttpAsdcClientException(String message, Throwable cause) { - super(message, cause); - } -} diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpAsdcResponse.java b/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpAsdcResponse.java deleted file mode 100644 index 8b85684..0000000 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpAsdcResponse.java +++ /dev/null @@ -1,71 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * sdc-distribution-client - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.sdc.http; - -import java.util.Map; - -import org.apache.http.HttpEntity; - -public class HttpAsdcResponse { - - private int status; - private HttpEntity message; - private Map headersMap; - - public HttpAsdcResponse(int status, HttpEntity message) { - super(); - this.status = status; - this.message = message; - } - - public HttpAsdcResponse(int status, HttpEntity message, Map headersMap) { - super(); - this.status = status; - this.message = message; - this.headersMap = headersMap; - } - - public Map getHeadersMap() { - return headersMap; - } - - public void setHeadersMap(Map headersMap) { - this.headersMap = headersMap; - } - - public int getStatus() { - return status; - } - - public void setStatus(int status) { - this.status = status; - } - - public HttpEntity getMessage() { - return message; - } - - public void setMessage(HttpEntity message) { - this.message = message; - } - - -} diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpClientFactory.java b/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpClientFactory.java index 7a073a1..94e20fb 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpClientFactory.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpClientFactory.java @@ -182,7 +182,7 @@ public class HttpClientFactory { return HttpClientBuilder.create().setDefaultCredentialsProvider(credsProvider).setProxy(getHttpsProxyHost()) .setSSLSocketFactory(sslsf).build(); } catch (Exception e) { - throw new HttpAsdcClientException("Failed to create https client", e); + throw new HttpSdcClientException("Failed to create https client", e); } } diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpSdcClient.java b/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpSdcClient.java new file mode 100644 index 0000000..8b6ee0a --- /dev/null +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpSdcClient.java @@ -0,0 +1,177 @@ +/*- + * ============LICENSE_START======================================================= + * sdc-distribution-client + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Modifications copyright (C) 2020 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.sdc.http; + +import org.apache.http.Header; +import org.apache.http.HttpEntity; +import org.apache.http.HttpStatus; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.onap.sdc.api.consumer.IConfiguration; +import org.onap.sdc.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +public class HttpSdcClient implements IHttpSdcClient { + + private static final Logger log = LoggerFactory.getLogger(HttpSdcClient.class.getName()); + private static final boolean ALWAYS_CLOSE_THE_REQUEST_CONNECTION = true; + private final CloseableHttpClient httpClient; + private final String httpSchema; + private final String serverFqdn; + private final HttpRequestFactory httpRequestFactory; + + /** + * Constructor + * + * @deprecated + * This constructor will be removed in the future. + * + * @param configuration Sdc client configuration + */ + @Deprecated + public HttpSdcClient(IConfiguration configuration) { + this(configuration.getSdcAddress(), + new HttpClientFactory(configuration), + new HttpRequestFactory(configuration.getUser(), configuration.getPassword()) + ); + } + + public HttpSdcClient(String sdcAddress, HttpClientFactory httpClientFactory, HttpRequestFactory httpRequestFactory) { + this.serverFqdn = sdcAddress; + this.httpRequestFactory = httpRequestFactory; + + Pair httpClientPair = httpClientFactory.createInstance(); + this.httpSchema = httpClientPair.getFirst(); + this.httpClient = httpClientPair.getSecond(); + } + + public HttpSdcResponse postRequest(String requestUrl, HttpEntity entity, Map headersMap) { + return postRequest(requestUrl, entity, headersMap, ALWAYS_CLOSE_THE_REQUEST_CONNECTION).getFirst(); + } + + public Pair postRequest(String requestUrl, HttpEntity entity, Map headersMap, boolean closeTheRequest) { + Pair ret; + final String url = resolveUrl(requestUrl); + log.debug("url to send {}", url); + HttpPost httpPost = httpRequestFactory.createHttpPostRequest(url, headersMap, entity); + + CloseableHttpResponse httpResponse = null; + HttpSdcResponse response = null; + try { + httpResponse = httpClient.execute(httpPost); + response = new HttpSdcResponse(httpResponse.getStatusLine().getStatusCode(), httpResponse.getEntity()); + } catch (IOException e) { + log.error("failed to send request to url: {}", requestUrl); + response = createHttpResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR, "failed to send request"); + } finally { + ret = finalizeHttpRequest(closeTheRequest, httpResponse, response); + } + + return ret; + } + + public HttpSdcResponse getRequest(String requestUrl, Map headersMap) { + return getRequest(requestUrl, headersMap, ALWAYS_CLOSE_THE_REQUEST_CONNECTION).getFirst(); + } + + public Pair getRequest(String requestUrl, Map headersMap, boolean closeTheRequest) { + Pair ret; + + final String url = resolveUrl(requestUrl); + log.debug("url to send {}", url); + HttpGet httpGet = httpRequestFactory.createHttpGetRequest(url, headersMap); + + CloseableHttpResponse httpResponse = null; + HttpSdcResponse response = null; + try { + httpResponse = httpClient.execute(httpGet); + + log.debug("GET Response Status {}", httpResponse.getStatusLine().getStatusCode()); + Header[] headersRes = httpResponse.getAllHeaders(); + Map headersResMap = new HashMap<>(); + for (Header header : headersRes) { + headersResMap.put(header.getName(), header.getValue()); + } + response = new HttpSdcResponse(httpResponse.getStatusLine().getStatusCode(), httpResponse.getEntity(), headersResMap); + + } catch (UnknownHostException | ConnectException e) { + log.error("failed to connect to url: {}", requestUrl, e); + response = createHttpResponse(HttpStatus.SC_BAD_GATEWAY, "failed to connect"); + } catch (IOException e) { + log.error("failed to send request to url: {} error {}", requestUrl, e.getMessage()); + response = createHttpResponse(HttpStatus.SC_BAD_GATEWAY, "failed to send request " + e.getMessage()); + } finally { + ret = finalizeHttpRequest(closeTheRequest, httpResponse, response); + } + + return ret; + } + + String getHttpSchema(){ + return this.httpSchema; + } + + private String resolveUrl(String requestUrl) { + return this.httpSchema + serverFqdn + requestUrl; + } + + private Pair finalizeHttpRequest(boolean closeTheRequest, CloseableHttpResponse httpResponse, HttpSdcResponse response) { + Pair ret; + if (closeTheRequest) { + if (httpResponse != null) { + try { + httpResponse.close(); + } catch (IOException e) { + log.error("failed to close http response"); + } + } + ret = new Pair<>(response, null); + } else { + ret = new Pair<>(response, httpResponse); + } + + return ret; + } + + static HttpSdcResponse createHttpResponse(int httpStatusCode, String httpMessage) { + return new HttpSdcResponse(httpStatusCode, new StringEntity(httpMessage, StandardCharsets.UTF_8)); + } + + public void closeHttpClient() { + try { + httpClient.close(); + } catch (IOException e) { + log.error("failed to close http client"); + } + } +} diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpSdcClientException.java b/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpSdcClientException.java new file mode 100644 index 0000000..d3747f5 --- /dev/null +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpSdcClientException.java @@ -0,0 +1,27 @@ +/*- + * ============LICENSE_START======================================================= + * sdc-distribution-client + * ================================================================================ + * Copyright (C) 2020 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.sdc.http; + +public class HttpSdcClientException extends RuntimeException { + + public HttpSdcClientException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpSdcResponse.java b/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpSdcResponse.java new file mode 100644 index 0000000..ad64f3f --- /dev/null +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/http/HttpSdcResponse.java @@ -0,0 +1,71 @@ +/*- + * ============LICENSE_START======================================================= + * sdc-distribution-client + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.sdc.http; + +import java.util.Map; + +import org.apache.http.HttpEntity; + +public class HttpSdcResponse { + + private int status; + private HttpEntity message; + private Map headersMap; + + public HttpSdcResponse(int status, HttpEntity message) { + super(); + this.status = status; + this.message = message; + } + + public HttpSdcResponse(int status, HttpEntity message, Map headersMap) { + super(); + this.status = status; + this.message = message; + this.headersMap = headersMap; + } + + public Map getHeadersMap() { + return headersMap; + } + + public void setHeadersMap(Map headersMap) { + this.headersMap = headersMap; + } + + public int getStatus() { + return status; + } + + public void setStatus(int status) { + this.status = status; + } + + public HttpEntity getMessage() { + return message; + } + + public void setMessage(HttpEntity message) { + this.message = message; + } + + +} diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/http/IHttpAsdcClient.java b/sdc-distribution-client/src/main/java/org/onap/sdc/http/IHttpAsdcClient.java deleted file mode 100644 index 2a0d9a0..0000000 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/http/IHttpAsdcClient.java +++ /dev/null @@ -1,35 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * sdc-distribution-client - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.sdc.http; - -import java.util.Map; - -import org.apache.http.HttpEntity; - -public interface IHttpAsdcClient { - - HttpAsdcResponse postRequest(String requestUrl, HttpEntity entity, Map headersMap); - - HttpAsdcResponse getRequest(String requestUrl, Map headersMap); - - void closeHttpClient(); - -} diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/http/IHttpSdcClient.java b/sdc-distribution-client/src/main/java/org/onap/sdc/http/IHttpSdcClient.java new file mode 100644 index 0000000..9adce23 --- /dev/null +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/http/IHttpSdcClient.java @@ -0,0 +1,35 @@ +/*- + * ============LICENSE_START======================================================= + * sdc-distribution-client + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.sdc.http; + +import java.util.Map; + +import org.apache.http.HttpEntity; + +public interface IHttpSdcClient { + + HttpSdcResponse postRequest(String requestUrl, HttpEntity entity, Map headersMap); + + HttpSdcResponse getRequest(String requestUrl, Map headersMap); + + void closeHttpClient(); + +} diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/http/SdcConnectorClient.java b/sdc-distribution-client/src/main/java/org/onap/sdc/http/SdcConnectorClient.java index 7aac780..18aba95 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/http/SdcConnectorClient.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/http/SdcConnectorClient.java @@ -21,6 +21,10 @@ package org.onap.sdc.http; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.reflect.TypeToken; +import fj.data.Either; import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Type; @@ -30,7 +34,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.UUID; - import org.apache.commons.io.IOUtils; import org.apache.http.HttpEntity; import org.apache.http.HttpHeaders; @@ -38,33 +41,26 @@ import org.apache.http.HttpStatus; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; +import org.onap.sdc.api.consumer.IConfiguration; import org.onap.sdc.api.notification.IArtifactInfo; import org.onap.sdc.api.results.IDistributionClientResult; +import org.onap.sdc.impl.DistributionClientDownloadResultImpl; import org.onap.sdc.impl.DistributionClientResultImpl; import org.onap.sdc.utils.DistributionActionResultEnum; -import org.onap.sdc.api.asdc.RegistrationRequest; -import org.onap.sdc.api.consumer.IConfiguration; -import org.onap.sdc.impl.DistributionClientDownloadResultImpl; import org.onap.sdc.utils.DistributionClientConstants; import org.onap.sdc.utils.Pair; +import org.onap.sdc.utils.kafka.KafkaDataResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.att.nsa.apiClient.credentials.ApiCredential; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.reflect.TypeToken; - -import fj.data.Either; - public class SdcConnectorClient { private static final Logger log = LoggerFactory.getLogger(SdcConnectorClient.class.getName()); static final String CONTENT_DISPOSITION_HEADER = "Content-Disposition"; private final IConfiguration configuration; - private final HttpAsdcClient httpClient; + private final HttpSdcClient httpClient; - public SdcConnectorClient(IConfiguration configuration, HttpAsdcClient httpClient) { + public SdcConnectorClient(IConfiguration configuration, HttpSdcClient httpClient) { Objects.requireNonNull(configuration); Objects.requireNonNull(httpClient); this.configuration = configuration; @@ -76,23 +72,37 @@ public class SdcConnectorClient { } public Either, IDistributionClientResult> getValidArtifactTypesList() { - Pair getServersResponsePair = performAsdcServerRequest(); - HttpAsdcResponse getArtifactTypeResponse = getServersResponsePair.getFirst(); + Pair getServersResponsePair = performSdcServerRequest(SdcUrls.GET_VALID_ARTIFACT_TYPES); + HttpSdcResponse getArtifactTypeResponse = getServersResponsePair.getFirst(); Either, IDistributionClientResult> response; if (getArtifactTypeResponse.getStatus() == HttpStatus.SC_OK) { response = parseGetValidArtifactTypesResponse(getArtifactTypeResponse); } else { - IDistributionClientResult asdcError = handleAsdcError(getArtifactTypeResponse); - response = Either.right(asdcError); + IDistributionClientResult sdcError = handleSdcError(getArtifactTypeResponse); + response = Either.right(sdcError); } - handeAsdcConnectionClose(getServersResponsePair); + handeSdcConnectionClose(getServersResponsePair); return response; } - private void handeAsdcConnectionClose(Pair getServersResponsePair) { + public Either getKafkaDistData() { + Pair getServersResponsePair = performSdcServerRequest(SdcUrls.GET_KAFKA_DIST_DATA); + HttpSdcResponse getKafkaDistDataResponse = getServersResponsePair.getFirst(); + Either response; + if (getKafkaDistDataResponse.getStatus() == HttpStatus.SC_OK) { + response = parseGetKafkaDistDataResponse(getKafkaDistDataResponse); + } else { + IDistributionClientResult sdcError = handleSdcError(getKafkaDistDataResponse); + response = Either.right(sdcError); + } + handeSdcConnectionClose(getServersResponsePair); + return response; + } + + private void handeSdcConnectionClose(Pair getServersResponsePair) { if (getServersResponsePair.getSecond() != null) { try { getServersResponsePair.getSecond().close(); @@ -104,79 +114,17 @@ public class SdcConnectorClient { } } - private Pair performAsdcServerRequest() { + private Pair performSdcServerRequest(String sdcUrl) { String requestId = generateRequestId(); Map requestHeaders = addHeadersToHttpRequest(requestId); - log.debug("about to perform getServerList. requestId= {} url= {}", requestId, AsdcUrls.GET_VALID_ARTIFACT_TYPES); - return httpClient.getRequest(AsdcUrls.GET_VALID_ARTIFACT_TYPES, requestHeaders, false); - } - - public Either registerAsdcTopics(ApiCredential credential) { - - Either response; - - String requestId = generateRequestId(); - Map requestHeaders = addHeadersToHttpRequest(requestId); - - RegistrationRequest registrationRequest = new RegistrationRequest(credential.getApiKey(), configuration.getEnvironmentName(), configuration.isConsumeProduceStatusTopic(), configuration.getMsgBusAddress()); - Gson gson = new GsonBuilder().setPrettyPrinting().create(); - String jsonRequest = gson.toJson(registrationRequest); - StringEntity body = new StringEntity(jsonRequest, ContentType.APPLICATION_JSON); - - log.debug("about to perform registerAsdcTopics. requestId= {} url= {}", requestId, AsdcUrls.POST_FOR_TOPIC_REGISTRATION); - Pair registerResponsePair = httpClient.postRequest(AsdcUrls.POST_FOR_TOPIC_REGISTRATION, body, requestHeaders, false); - HttpAsdcResponse registerResponse = registerResponsePair.getFirst(); - int status = registerResponse.getStatus(); - - if (status == HttpStatus.SC_OK) { - response = parseRegistrationResponse(registerResponse); - - } else { - DistributionClientResultImpl asdcError = handleAsdcError(registerResponse); - return Either.right(asdcError); - } - handeAsdcConnectionClose(registerResponsePair); - - log.debug("registerAsdcTopics response= {}. requestId= {} url= {}", status, requestId, AsdcUrls.POST_FOR_TOPIC_REGISTRATION); - return response; - + log.debug("about to perform get on SDC. requestId= {} url= {}", requestId, sdcUrl); + return httpClient.getRequest(sdcUrl, requestHeaders, false); } private String generateRequestId() { return UUID.randomUUID().toString(); } - public IDistributionClientResult unregisterTopics(ApiCredential credential) { - - DistributionClientResultImpl response; - - String requestId = generateRequestId(); - Map requestHeaders = addHeadersToHttpRequest(requestId); - - RegistrationRequest registrationRequest = new RegistrationRequest(credential.getApiKey(), configuration.getEnvironmentName(), configuration.isConsumeProduceStatusTopic(), configuration.getMsgBusAddress()); - Gson gson = new GsonBuilder().setPrettyPrinting().create(); - String jsonRequest = gson.toJson(registrationRequest); - StringEntity body = new StringEntity(jsonRequest, ContentType.APPLICATION_JSON); - - log.debug("about to perform unregisterTopics. requestId= {} url= {}", requestId, AsdcUrls.POST_FOR_UNREGISTER); - Pair unRegisterResponsePair = httpClient.postRequest(AsdcUrls.POST_FOR_UNREGISTER, body, requestHeaders, false); - HttpAsdcResponse unRegisterResponse = unRegisterResponsePair.getFirst(); - int status = unRegisterResponse.getStatus(); - if (status == HttpStatus.SC_NO_CONTENT || status == HttpStatus.SC_OK) { - response = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "unregistration successful"); - - } else { - response = handleAsdcError(unRegisterResponse); - } - - handeAsdcConnectionClose(unRegisterResponsePair); - - log.debug("unregisterTopics response = {}. requestId= {} url= {}", status, requestId, AsdcUrls.POST_FOR_UNREGISTER); - - return response; - - } - public DistributionClientDownloadResultImpl downloadArtifact(IArtifactInfo artifactInfo) { DistributionClientDownloadResultImpl response; @@ -186,23 +134,23 @@ public class SdcConnectorClient { requestHeaders.put(DistributionClientConstants.HEADER_INSTANCE_ID, configuration.getConsumerID()); requestHeaders.put(HttpHeaders.ACCEPT, ContentType.APPLICATION_OCTET_STREAM.toString()); String requestUrl = artifactInfo.getArtifactURL(); - Pair downloadPair = httpClient.getRequest(requestUrl, requestHeaders, false); - HttpAsdcResponse downloadResponse = downloadPair.getFirst(); + Pair downloadPair = httpClient.getRequest(requestUrl, requestHeaders, false); + HttpSdcResponse downloadResponse = downloadPair.getFirst(); int status = downloadResponse.getStatus(); if (status == HttpStatus.SC_OK) { response = parseDownloadArtifactResponse(artifactInfo, downloadResponse); } else { - response = handleAsdcDownloadArtifactError(downloadResponse); + response = handleSdcDownloadArtifactError(downloadResponse); } - handeAsdcConnectionClose(downloadPair); + handeSdcConnectionClose(downloadPair); return response; } /* **************************** private methods ********************************************/ - private Either, IDistributionClientResult> parseGetValidArtifactTypesResponse(HttpAsdcResponse getArtifactTypesResponse) { + private Either, IDistributionClientResult> parseGetValidArtifactTypesResponse(HttpSdcResponse getArtifactTypesResponse) { Either, IDistributionClientResult> result; try { String jsonMessage = IOUtils.toString(getArtifactTypesResponse.getMessage().getContent()); @@ -219,39 +167,33 @@ public class SdcConnectorClient { return result; } - private Either, IDistributionClientResult> handleParsingError(Exception e) { - Either, IDistributionClientResult> result; - log.error("failed to parse response from ASDC. error: ", e); - IDistributionClientResult response = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "failed to parse response from ASDC"); - result = Either.right(response); - return result; - } - - Either parseRegistrationResponse(HttpAsdcResponse registerResponse) { - - String jsonMessage; + private Either parseGetKafkaDistDataResponse(HttpSdcResponse getKafkaDistDataResponse) { + Either result; try { - jsonMessage = IOUtils.toString(registerResponse.getMessage().getContent()); - + String jsonMessage = IOUtils.toString(getKafkaDistDataResponse.getMessage().getContent()); Gson gson = new GsonBuilder().create(); - TopicRegistrationResponse registrationResponse = gson.fromJson(jsonMessage, TopicRegistrationResponse.class); - - if (registrationResponse.getDistrNotificationTopicName() == null) { - DistributionClientResultImpl response = new DistributionClientResultImpl(DistributionActionResultEnum.FAIL, "failed to receive notification topic from ASDC"); - return Either.right(response); - } - - if (registrationResponse.getDistrStatusTopicName() == null) { - DistributionClientResultImpl response = new DistributionClientResultImpl(DistributionActionResultEnum.FAIL, "failed to receive status topic from ASDC"); - return Either.right(response); - } - return Either.left(registrationResponse); - + KafkaDataResponse kafkaData = gson.fromJson(jsonMessage, KafkaDataResponse.class); + result = Either.left(kafkaData); } catch (UnsupportedOperationException | IOException e) { - log.error("failed to parse response from ASDC. error: ", e); - DistributionClientResultImpl response = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "failed to parse response from ASDC"); - return Either.right(response); + result = handleKafkaParsingError(e); } + return result; + } + + private Either handleKafkaParsingError(Exception e) { + Either result; + log.error("failed to parse kafka data response from SDC. error: ", e); + IDistributionClientResult response = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "failed to parse kafka data response from SDC"); + result = Either.right(response); + return result; + } + + private Either, IDistributionClientResult> handleParsingError(Exception e) { + Either, IDistributionClientResult> result; + log.error("failed to parse response from SDC. error: ", e); + IDistributionClientResult response = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "failed to parse response from SDC"); + result = Either.right(response); + return result; } protected Map addHeadersToHttpRequest(String requestId) { @@ -263,29 +205,29 @@ public class SdcConnectorClient { return requestHeaders; } - private DistributionClientResultImpl handleAsdcError(HttpAsdcResponse registerResponse) { + private DistributionClientResultImpl handleSdcError(HttpSdcResponse registerResponse) { int status = registerResponse.getStatus(); - DistributionClientResultImpl errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "failed to send request to ASDC"); + DistributionClientResultImpl errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "failed to send request to SDC"); if (status == HttpStatus.SC_UNAUTHORIZED) { - errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.ASDC_AUTHENTICATION_FAILED, "authentication to ASDC failed for user " + configuration.getUser()); + errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.SDC_AUTHENTICATION_FAILED, "authentication to SDC failed for user " + configuration.getUser()); } else if (status == HttpStatus.SC_FORBIDDEN) { - errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.ASDC_AUTHORIZATION_FAILED, "authorization failure for user " + configuration.getUser()); + errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.SDC_AUTHORIZATION_FAILED, "authorization failure for user " + configuration.getUser()); } else if (status == HttpStatus.SC_BAD_REQUEST) { - errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.BAD_REQUEST, "ASDC call failed due to missing information"); + errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.BAD_REQUEST, "SDC call failed due to missing information"); } else if (status == HttpStatus.SC_NOT_FOUND) { - errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.ASDC_NOT_FOUND, "ASDC not found"); + errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.SDC_NOT_FOUND, "SDC not found"); } else if (status == HttpStatus.SC_INTERNAL_SERVER_ERROR) { - errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.ASDC_SERVER_PROBLEM, "ASDC server problem"); + errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.SDC_SERVER_PROBLEM, "SDC server problem"); } else if (status == HttpStatus.SC_BAD_GATEWAY) { - errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.ASDC_CONNECTION_FAILED, "ASDC server problem"); + errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.SDC_CONNECTION_FAILED, "SDC server problem"); } else if (status == HttpStatus.SC_GATEWAY_TIMEOUT) { - errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.ASDC_SERVER_TIMEOUT, "ASDC server problem"); + errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.SDC_SERVER_TIMEOUT, "SDC server problem"); } - log.error("status from ASDC is {}", registerResponse); + log.error("status from SDC is {}", registerResponse); log.error(errorResponse.toString()); try { String errorString = IOUtils.toString(registerResponse.getMessage().getContent()); - log.debug("error from ASDC is: {}", errorString); + log.debug("error from SDC is: {}", errorString); } catch (UnsupportedOperationException | IOException e) { log.error("During error handling another exception occurred: ", e); } @@ -293,27 +235,27 @@ public class SdcConnectorClient { } - private DistributionClientDownloadResultImpl handleAsdcDownloadArtifactError(HttpAsdcResponse registerResponse) { + private DistributionClientDownloadResultImpl handleSdcDownloadArtifactError(HttpSdcResponse registerResponse) { int status = registerResponse.getStatus(); - DistributionClientDownloadResultImpl errorResponse = new DistributionClientDownloadResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "failed to send request to ASDC"); + DistributionClientDownloadResultImpl errorResponse = new DistributionClientDownloadResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "failed to send request to SDC"); if (status == HttpStatus.SC_UNAUTHORIZED) { - errorResponse = new DistributionClientDownloadResultImpl(DistributionActionResultEnum.ASDC_AUTHENTICATION_FAILED, "authentication to ASDC failed for user " + configuration.getUser()); + errorResponse = new DistributionClientDownloadResultImpl(DistributionActionResultEnum.SDC_AUTHENTICATION_FAILED, "authentication to SDC failed for user " + configuration.getUser()); } else if (status == HttpStatus.SC_FORBIDDEN) { - errorResponse = new DistributionClientDownloadResultImpl(DistributionActionResultEnum.ASDC_AUTHORIZATION_FAILED, "authorization failure for user " + configuration.getUser()); + errorResponse = new DistributionClientDownloadResultImpl(DistributionActionResultEnum.SDC_AUTHORIZATION_FAILED, "authorization failure for user " + configuration.getUser()); } else if (status == HttpStatus.SC_BAD_REQUEST || status == HttpStatus.SC_NOT_FOUND) { errorResponse = new DistributionClientDownloadResultImpl(DistributionActionResultEnum.ARTIFACT_NOT_FOUND, "Specified artifact is not found"); // } else if (status == 404){ // errorResponse = new DistributionClientDownloadResultImpl( - // DistributionActionResultEnum.ASDC_NOT_FOUND, - // "ASDC not found"); + // DistributionActionResultEnum.SDC_NOT_FOUND, + // "SDC not found"); } else if (status == HttpStatus.SC_INTERNAL_SERVER_ERROR) { - errorResponse = new DistributionClientDownloadResultImpl(DistributionActionResultEnum.ASDC_SERVER_PROBLEM, "ASDC server problem"); + errorResponse = new DistributionClientDownloadResultImpl(DistributionActionResultEnum.SDC_SERVER_PROBLEM, "SDC server problem"); } - log.error("status from ASDC is {}", registerResponse); + log.error("status from SDC is {}", registerResponse); log.error(errorResponse.toString()); try { String errorString = IOUtils.toString(registerResponse.getMessage().getContent()); - log.debug("error from ASDC is: {}", errorString); + log.debug("error from SDC is: {}", errorString); } catch (UnsupportedOperationException | IOException e) { log.error("During error handling another exception occurred: ", e); } @@ -321,7 +263,7 @@ public class SdcConnectorClient { } - private DistributionClientDownloadResultImpl parseDownloadArtifactResponse(IArtifactInfo artifactInfo, HttpAsdcResponse getServersResponse) { + private DistributionClientDownloadResultImpl parseDownloadArtifactResponse(IArtifactInfo artifactInfo, HttpSdcResponse getServersResponse) { HttpEntity entity = getServersResponse.getMessage(); InputStream is; try { @@ -333,7 +275,7 @@ public class SdcConnectorClient { byte[] payload = IOUtils.toByteArray(is); if (artifactInfo.getArtifactChecksum() == null || artifactInfo.getArtifactChecksum().isEmpty()) { - return new DistributionClientDownloadResultImpl(DistributionActionResultEnum.DATA_INTEGRITY_PROBLEM, "failed to get artifact from ASDC. Empty checksum"); + return new DistributionClientDownloadResultImpl(DistributionActionResultEnum.DATA_INTEGRITY_PROBLEM, "failed to get artifact from SDC. Empty checksum"); } return new DistributionClientDownloadResultImpl(DistributionActionResultEnum.SUCCESS, "success", artifactName, payload); diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/http/SdcUrls.java b/sdc-distribution-client/src/main/java/org/onap/sdc/http/SdcUrls.java new file mode 100644 index 0000000..4b9a648 --- /dev/null +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/http/SdcUrls.java @@ -0,0 +1,30 @@ +/*- + * ============LICENSE_START======================================================= + * sdc-distribution-client + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.sdc.http; + +public class SdcUrls { + + private SdcUrls() { + + } + public static final String GET_VALID_ARTIFACT_TYPES = "/sdc/v1/artifactTypes"; + public static final String GET_KAFKA_DIST_DATA = "/sdc/v1/distributionKafkaData"; +} diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/http/TopicRegistrationResponse.java b/sdc-distribution-client/src/main/java/org/onap/sdc/http/TopicRegistrationResponse.java deleted file mode 100644 index 448005c..0000000 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/http/TopicRegistrationResponse.java +++ /dev/null @@ -1,43 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * sdc-distribution-client - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.sdc.http; - -public class TopicRegistrationResponse { - private String distrNotificationTopicName; - private String distrStatusTopicName; - - - public void setDistrNotificationTopicName(String distrNotificationTopicName) { - this.distrNotificationTopicName = distrNotificationTopicName; - } - - public void setDistrStatusTopicName(String distrStatusTopicName) { - this.distrStatusTopicName = distrStatusTopicName; - } - - public String getDistrNotificationTopicName() { - return distrNotificationTopicName; - } - - public String getDistrStatusTopicName() { - return distrStatusTopicName; - } -} diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java index 2da9c4e..db4433b 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java @@ -28,7 +28,12 @@ import org.onap.sdc.utils.DistributionClientConstants; public class Configuration implements IConfiguration { private List msgBusAddressList; - private String asdcAddress; + private final String kafkaSecurityProtocolConfig; + private final String kafkaSaslMechanism; + private final String kafkaSaslJaasConfig; + private String sdcStatusTopicName; + private String sdcNotificationTopicName; + private String sdcAddress; private String user; private String password; private int pollingInterval = DistributionClientConstants.MIN_POLLING_INTERVAL_SEC; @@ -40,10 +45,9 @@ public class Configuration implements IConfiguration { private String keyStorePath; private String keyStorePassword; private boolean activateServerTLSAuth; - private boolean filterInEmptyResources; - private Boolean useHttpsWithDmaap; + private final boolean filterInEmptyResources; private Boolean useHttpsWithSDC; - private boolean consumeProduceStatusTopic; + private final boolean consumeProduceStatusTopic; private String httpProxyHost; private int httpProxyPort; private String httpsProxyHost; @@ -51,23 +55,24 @@ public class Configuration implements IConfiguration { private boolean useSystemProxy; public Configuration(IConfiguration other) { - this.asdcAddress = other.getAsdcAddress(); - this.msgBusAddressList = other.getMsgBusAddress(); + this.kafkaSecurityProtocolConfig = other.getKafkaSecurityProtocolConfig(); + this.kafkaSaslMechanism = other.getKafkaSaslMechanism(); + this.kafkaSaslJaasConfig = other.getKafkaSaslJaasConfig(); this.comsumerID = other.getConsumerID(); this.consumerGroup = other.getConsumerGroup(); - this.environmentName = other.getEnvironmentName(); - this.password = other.getPassword(); this.pollingInterval = other.getPollingInterval(); this.pollingTimeout = other.getPollingTimeout(); - this.relevantArtifactTypes = other.getRelevantArtifactTypes(); + this.environmentName = other.getEnvironmentName(); + this.consumeProduceStatusTopic = other.isConsumeProduceStatusTopic(); + this.sdcAddress = other.getSdcAddress(); this.user = other.getUser(); + this.password = other.getPassword(); + this.relevantArtifactTypes = other.getRelevantArtifactTypes(); this.useHttpsWithSDC = other.isUseHttpsWithSDC(); this.keyStorePath = other.getKeyStorePath(); this.keyStorePassword = other.getKeyStorePassword(); this.activateServerTLSAuth = other.activateServerTLSAuth(); this.filterInEmptyResources = other.isFilterInEmptyResources(); - this.useHttpsWithDmaap = other.isUseHttpsWithDmaap(); - this.consumeProduceStatusTopic = other.isConsumeProduceStatusTopic(); this.httpProxyHost = other.getHttpProxyHost(); this.httpProxyPort = other.getHttpProxyPort(); this.httpsProxyHost = other.getHttpsProxyHost(); @@ -76,8 +81,24 @@ public class Configuration implements IConfiguration { } @Override - public String getAsdcAddress() { - return asdcAddress; + public String getSdcAddress() { + return sdcAddress; + } + + public String getStatusTopicName() { + return sdcStatusTopicName; + } + + public void setStatusTopicName(String sdcStatusTopicName) { + this.sdcStatusTopicName = sdcStatusTopicName; + } + + public String getNotificationTopicName() { + return sdcNotificationTopicName; + } + + public void setNotificationTopicName(String sdcNotificationTopicName) { + this.sdcNotificationTopicName = sdcNotificationTopicName; } @Override @@ -85,6 +106,25 @@ public class Configuration implements IConfiguration { return msgBusAddressList; } + public void setMsgBusAddress(List newMsgBusAddress) { + msgBusAddressList = newMsgBusAddress; + } + + @Override + public String getKafkaSecurityProtocolConfig() { + return kafkaSecurityProtocolConfig; + } + + @Override + public String getKafkaSaslMechanism() { + return kafkaSaslMechanism; + } + + @Override + public String getKafkaSaslJaasConfig() { + return kafkaSaslJaasConfig; + } + @Override public Boolean isUseHttpsWithSDC() { return useHttpsWithSDC; @@ -169,8 +209,8 @@ public class Configuration implements IConfiguration { this.comsumerID = comsumerID; } - public void setAsdcAddress(String asdcAddress) { - this.asdcAddress = asdcAddress; + public void setSdcAddress(String sdcAddress) { + this.sdcAddress = sdcAddress; } public void setUser(String user) { @@ -243,19 +283,10 @@ public class Configuration implements IConfiguration { return this.filterInEmptyResources; } - @Override - public Boolean isUseHttpsWithDmaap() { - return this.useHttpsWithDmaap; - } - public void setUseHttpsWithSDC(boolean useHttpsWithSDC) { this.useHttpsWithSDC = useHttpsWithSDC; } - public void setUseHttpsWithDmaap(boolean useHttpsWithDmaap) { - this.useHttpsWithDmaap = useHttpsWithDmaap; - } - @Override public boolean isConsumeProduceStatusTopic() { return this.consumeProduceStatusTopic; @@ -265,11 +296,13 @@ public class Configuration implements IConfiguration { public String toString() { //@formatter:off return "Configuration [" - + "asdcAddress=" + asdcAddress + + "sdcAddress=" + sdcAddress + ", user=" + user + ", password=" + password + ", useHttpsWithSDC=" + useHttpsWithSDC + ", pollingInterval=" + pollingInterval + + ", sdcStatusTopicName=" + sdcStatusTopicName + + ", sdcNotificationTopicName=" + sdcNotificationTopicName + ", pollingTimeout=" + pollingTimeout + ", relevantArtifactTypes=" + relevantArtifactTypes + ", consumerGroup=" + consumerGroup @@ -279,7 +312,6 @@ public class Configuration implements IConfiguration { + ", keyStorePassword=" + keyStorePassword + ", activateServerTLSAuth=" + activateServerTLSAuth + ", filterInEmptyResources=" + filterInEmptyResources - + ", useHttpsWithDmaap=" + useHttpsWithDmaap + ", consumeProduceStatusTopic=" + consumeProduceStatusTopic + ", useSystemProxy=" + useSystemProxy + ", httpProxyHost=" + httpProxyHost diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/ConfigurationValidator.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/ConfigurationValidator.java index b645ed1..829c6ce 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/ConfigurationValidator.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/ConfigurationValidator.java @@ -19,23 +19,22 @@ */ package org.onap.sdc.impl; -import org.onap.sdc.api.consumer.IConfiguration; -import org.onap.sdc.api.consumer.IStatusCallback; -import org.onap.sdc.utils.DistributionActionResultEnum; -import org.onap.sdc.utils.DistributionClientConstants; - import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.function.Function; import java.util.regex.Matcher; +import org.onap.sdc.api.consumer.IConfiguration; +import org.onap.sdc.api.consumer.IStatusCallback; +import org.onap.sdc.utils.DistributionActionResultEnum; +import org.onap.sdc.utils.DistributionClientConstants; public class ConfigurationValidator { private Map, DistributionActionResultEnum> cachedValidators; - DistributionActionResultEnum validateConfiguration(IConfiguration conf, IStatusCallback statusCallback) { + public DistributionActionResultEnum validateConfiguration(IConfiguration conf, IStatusCallback statusCallback) { final Map, DistributionActionResultEnum> validators = getValidators(statusCallback); for (Map.Entry, DistributionActionResultEnum> validation : validators.entrySet()) { @@ -53,10 +52,8 @@ public class ConfigurationValidator { validators.put(isCustomerIdNotSet(), DistributionActionResultEnum.CONF_MISSING_CONSUMER_ID); validators.put(isUserNotSet(), DistributionActionResultEnum.CONF_MISSING_USERNAME); validators.put(isPasswordNotSet(), DistributionActionResultEnum.CONF_MISSING_PASSWORD); - validators.put(isMsgBusAddressNotSet(), DistributionActionResultEnum.CONF_MISSING_MSG_BUS_ADDRESS); - validators.put(isAsdcAddressNotSet(), DistributionActionResultEnum.CONF_MISSING_ASDC_FQDN); - validators.put(isFqdnValid(), DistributionActionResultEnum.CONF_INVALID_ASDC_FQDN); - validators.put(areFqdnsValid(), DistributionActionResultEnum.CONF_INVALID_MSG_BUS_ADDRESS); + validators.put(isSdcAddressNotSet(), DistributionActionResultEnum.CONF_MISSING_SDC_FQDN); + validators.put(isFqdnValid(), DistributionActionResultEnum.CONF_INVALID_SDC_FQDN); validators.put(isEnvNameNotSet(), DistributionActionResultEnum.CONF_MISSING_ENVIRONMENT_NAME); validators.put(isRelevantArtifactTypesNotSet(), DistributionActionResultEnum.CONF_MISSING_ARTIFACT_TYPES); validators.put(isConsumeStatusTopicWithCallbackNotSet(statusCallback), DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG); @@ -85,17 +82,10 @@ public class ConfigurationValidator { return it -> it.getEnvironmentName() == null || it.getEnvironmentName().isEmpty(); } - private Function areFqdnsValid() { - return it -> !isValidFqdns(it.getMsgBusAddress()); - } - private Function isFqdnValid() { - return it -> !isValidFqdn(it.getAsdcAddress()); + return it -> !isValidFqdn(it.getSdcAddress()); } - private Function isMsgBusAddressNotSet() { - return it -> it.getMsgBusAddress() == null || it.getMsgBusAddress().isEmpty(); - } private Function isPasswordNotSet() { return it -> it.getPassword() == null || it.getPassword().isEmpty(); @@ -113,8 +103,8 @@ public class ConfigurationValidator { return it -> it.getConsumerID() == null || it.getConsumerID().isEmpty(); } - private Function isAsdcAddressNotSet() { - return it -> it.getAsdcAddress() == null || it.getAsdcAddress().isEmpty(); + private Function isSdcAddressNotSet() { + return it -> it.getSdcAddress() == null || it.getSdcAddress().isEmpty(); } static boolean isValidFqdn(String fqdn) { diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java index 3a25abb..a34ba1e 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java @@ -23,11 +23,12 @@ package org.onap.sdc.impl; import static java.util.Objects.isNull; -import java.io.IOException; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.reflect.TypeToken; +import fj.data.Either; import java.lang.reflect.Type; -import java.net.MalformedURLException; import java.nio.charset.StandardCharsets; -import java.security.GeneralSecurityException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -35,8 +36,8 @@ import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; - import org.apache.http.HttpHost; +import org.apache.kafka.common.KafkaException; import org.onap.sdc.api.IDistributionClient; import org.onap.sdc.api.IDistributionStatusMessageJsonBuilder; import org.onap.sdc.api.consumer.IComponentDoneStatusMessage; @@ -46,53 +47,35 @@ import org.onap.sdc.api.consumer.IFinalDistrStatusMessage; import org.onap.sdc.api.consumer.INotificationCallback; import org.onap.sdc.api.consumer.IStatusCallback; import org.onap.sdc.api.notification.IArtifactInfo; +import org.onap.sdc.api.notification.IVfModuleMetadata; import org.onap.sdc.api.results.IDistributionClientDownloadResult; import org.onap.sdc.api.results.IDistributionClientResult; -import org.onap.sdc.http.HttpAsdcClient; +import org.onap.sdc.http.HttpClientFactory; +import org.onap.sdc.http.HttpRequestFactory; +import org.onap.sdc.http.HttpSdcClient; import org.onap.sdc.http.SdcConnectorClient; -import org.onap.sdc.http.TopicRegistrationResponse; import org.onap.sdc.utils.DistributionActionResultEnum; import org.onap.sdc.utils.DistributionClientConstants; -import org.onap.sdc.utils.GeneralUtils; import org.onap.sdc.utils.NotificationSender; import org.onap.sdc.utils.Pair; import org.onap.sdc.utils.Wrapper; -import org.onap.sdc.api.notification.IVfModuleMetadata; +import org.onap.sdc.utils.kafka.KafkaDataResponse; +import org.onap.sdc.utils.kafka.SdcKafkaConsumer; +import org.onap.sdc.utils.kafka.SdcKafkaProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.att.nsa.apiClient.credentials.ApiCredential; -import com.att.nsa.apiClient.http.HttpException; -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.cambria.client.CambriaClient.CambriaApiException; -import com.att.nsa.cambria.client.CambriaClientBuilders.AbstractAuthenticatedManagerBuilder; -import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; -import com.att.nsa.cambria.client.CambriaClientBuilders.IdentityManagerBuilder; -import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; -import com.att.nsa.cambria.client.CambriaConsumer; -import com.att.nsa.cambria.client.CambriaIdentityManager; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.reflect.TypeToken; - -import fj.data.Either; - public class DistributionClientImpl implements IDistributionClient { - private static final int POLLING_TIMEOUT_MULTIPLIER = 1000; private static final int TERMINATION_TIMEOUT = 60; private final Logger log; - private SdcConnectorClient asdcConnector; + private SdcConnectorClient sdcConnector; private ScheduledExecutorService executorPool = null; - protected CambriaIdentityManager cambriaIdentityManager = null; - private List brokerServers; - private ApiCredential credential; + private SdcKafkaProducer producer; protected Configuration configuration; private INotificationCallback callback; private IStatusCallback statusCallback; - private String notificationTopic; - private String statusTopic; private boolean isConsumerGroupGenerated = false; private NotificationSender notificationSender; private final ConfigurationValidator configurationValidator = new ConfigurationValidator(); @@ -109,13 +92,70 @@ public class DistributionClientImpl implements IDistributionClient { this.log = log; } + @Override + public synchronized IDistributionClientResult init(IConfiguration conf, INotificationCallback notificationCallback, IStatusCallback statusCallback) { + IDistributionClientResult initResult; + if (!conf.isConsumeProduceStatusTopic()) { + initResult = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG, + "configuration is invalid: isConsumeProduceStatusTopic() should be set to 'true'"); + + } else if (isNull(statusCallback)) { + initResult = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG, + "configuration is invalid: statusCallback is not defined"); + } else { + this.statusCallback = statusCallback; + initResult = init(conf, notificationCallback); + } + return initResult; + } + + @Override + public synchronized IDistributionClientResult init(IConfiguration conf, INotificationCallback callback) { + + log.info("DistributionClient - init"); + + Wrapper errorWrapper = new Wrapper<>(); + validateNotInitilized(errorWrapper); + if (errorWrapper.isEmpty()) { + validateNotTerminated(errorWrapper); + } + if (errorWrapper.isEmpty()) { + this.configuration = validateAndInitConfiguration(errorWrapper, conf).getSecond(); + this.sdcConnector = createSdcConnector(configuration); + } + if (errorWrapper.isEmpty()) { + validateArtifactTypesWithSdcServer(conf, errorWrapper); + } + if (errorWrapper.isEmpty()) { + this.callback = callback; + } + if (errorWrapper.isEmpty()) { + initKafkaData(errorWrapper); + } + if (errorWrapper.isEmpty()) { + initKafkaProducer(errorWrapper, configuration); + } + if (errorWrapper.isEmpty()) { + this.notificationSender = new NotificationSender(producer); + } + IDistributionClientResult result; + if (errorWrapper.isEmpty()) { + isInitialized = true; + result = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, + "distribution client initialized successfully"); + } else { + result = errorWrapper.getInnerElement(); + } + + return result; + } + @Override public IConfiguration getConfiguration() { return configuration; } @Override - /* see javadoc */ public synchronized IDistributionClientResult updateConfiguration(IConfiguration conf) { log.info("update DistributionClient configuration"); @@ -126,33 +166,34 @@ public class DistributionClientImpl implements IDistributionClient { return errorWrapper.getInnerElement(); } - IDistributionClientResult updateResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "configuration updated successfuly"); + IDistributionClientResult updateResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, + "configuration updated successfully"); - boolean needToUpdateCambriaConsumer = false; + boolean needToUpdateConsumer = false; if (conf.getRelevantArtifactTypes() != null && !conf.getRelevantArtifactTypes().isEmpty()) { configuration.setRelevantArtifactTypes(conf.getRelevantArtifactTypes()); - needToUpdateCambriaConsumer = true; + needToUpdateConsumer = true; } if (isPollingIntervalValid(conf.getPollingInterval())) { configuration.setPollingInterval(conf.getPollingInterval()); - needToUpdateCambriaConsumer = true; + needToUpdateConsumer = true; } if (isPollingTimeoutValid(conf.getPollingTimeout())) { configuration.setPollingTimeout(conf.getPollingTimeout()); - needToUpdateCambriaConsumer = true; + needToUpdateConsumer = true; } if (conf.getConsumerGroup() != null) { configuration.setConsumerGroup(conf.getConsumerGroup()); isConsumerGroupGenerated = false; - needToUpdateCambriaConsumer = true; + needToUpdateConsumer = true; } else if (!isConsumerGroupGenerated) { String generatedConsumerGroup = UUID.randomUUID().toString(); configuration.setConsumerGroup(generatedConsumerGroup); isConsumerGroupGenerated = true; } - if (needToUpdateCambriaConsumer) { + if (needToUpdateConsumer) { updateResult = restartConsumer(); } @@ -167,7 +208,7 @@ public class DistributionClientImpl implements IDistributionClient { log.info("start DistributionClient"); IDistributionClientResult startResult; - CambriaConsumer cambriaNotificationConsumer = null; + SdcKafkaConsumer kafkaConsumer = null; Wrapper errorWrapper = new Wrapper<>(); validateRunReady(errorWrapper); if (errorWrapper.isEmpty()) { @@ -175,49 +216,49 @@ public class DistributionClientImpl implements IDistributionClient { } if (errorWrapper.isEmpty()) { try { - cambriaNotificationConsumer = new ConsumerBuilder().authenticatedBy(credential.getApiKey(), credential.getApiSecret()).knownAs(configuration.getConsumerGroup(), configuration.getConsumerID()).onTopic(notificationTopic).usingHttps(configuration.isUseHttpsWithDmaap()).usingHosts(brokerServers) - .withSocketTimeout(configuration.getPollingTimeout() * POLLING_TIMEOUT_MULTIPLIER).build(); - - } catch (MalformedURLException | GeneralSecurityException e) { - handleCambriaInitFailure(errorWrapper, e); + kafkaConsumer = new SdcKafkaConsumer(configuration); + kafkaConsumer.subscribe(configuration.getNotificationTopicName()); + } catch (KafkaException | IllegalArgumentException e) { + handleMessagingClientInitFailure(errorWrapper, e); } } if (errorWrapper.isEmpty()) { - - List relevantArtifactTypes = configuration.getRelevantArtifactTypes(); - // Remove nulls from list - workaround for how configuration is built - relevantArtifactTypes.removeAll(Collections.singleton(null)); - NotificationConsumer consumer = new NotificationConsumer(cambriaNotificationConsumer, callback, relevantArtifactTypes, this); - executorPool = Executors.newScheduledThreadPool(DistributionClientConstants.POOL_SIZE); - executorPool.scheduleAtFixedRate(consumer, 0, configuration.getPollingInterval(), TimeUnit.SECONDS); - - handleStatusConsumer(errorWrapper, executorPool); + startNotificationConsumer(kafkaConsumer); + startStatusConsumer(errorWrapper, executorPool); } if (!errorWrapper.isEmpty()) { startResult = errorWrapper.getInnerElement(); } else { - startResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "distribution client started successfuly"); + startResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, + "distribution client started successfully"); isStarted = true; } return startResult; } - private void handleStatusConsumer(Wrapper errorWrapper, ScheduledExecutorService executorPool) { + private void startNotificationConsumer(SdcKafkaConsumer kafkaConsumer) { + List relevantArtifactTypes = configuration.getRelevantArtifactTypes(); + // Remove nulls from list - workaround for how configuration is built + relevantArtifactTypes.removeAll(Collections.singleton(null)); + NotificationConsumer consumer = new NotificationConsumer(kafkaConsumer, callback, relevantArtifactTypes, this); + executorPool = Executors.newScheduledThreadPool(DistributionClientConstants.POOL_SIZE); + executorPool.scheduleAtFixedRate(consumer, 0, configuration.getPollingInterval(), TimeUnit.SECONDS); + } + + private void startStatusConsumer(Wrapper errorWrapper, ScheduledExecutorService executorPool) { if (configuration.isConsumeProduceStatusTopic()) { - CambriaConsumer cambriaStatusConsumer; try { - cambriaStatusConsumer = new ConsumerBuilder().authenticatedBy(credential.getApiKey(), credential.getApiSecret()).knownAs(configuration.getConsumerGroup(), configuration.getConsumerID()).onTopic(statusTopic).usingHttps(configuration.isUseHttpsWithDmaap()).usingHosts(brokerServers) - .withSocketTimeout(configuration.getPollingTimeout() * POLLING_TIMEOUT_MULTIPLIER).build(); - StatusConsumer statusConsumer = new StatusConsumer(cambriaStatusConsumer, statusCallback); + SdcKafkaConsumer kafkaConsumer = new SdcKafkaConsumer(configuration); + kafkaConsumer.subscribe(configuration.getStatusTopicName()); + StatusConsumer statusConsumer = new StatusConsumer(kafkaConsumer, statusCallback); executorPool.scheduleAtFixedRate(statusConsumer, 0, configuration.getPollingInterval(), TimeUnit.SECONDS); - } catch (MalformedURLException | GeneralSecurityException e) { - handleCambriaInitFailure(errorWrapper, e); + } catch (KafkaException | IllegalArgumentException e) { + handleMessagingClientInitFailure(errorWrapper, e); } } } @Override - /* see javadoc */ public synchronized IDistributionClientResult stop() { log.info("stop DistributionClient"); @@ -226,30 +267,13 @@ public class DistributionClientImpl implements IDistributionClient { if (!errorWrapper.isEmpty()) { return errorWrapper.getInnerElement(); } - // 1. stop polling notification topic shutdownExecutor(); - // 2. send to ASDC unregister to topic - IDistributionClientResult unregisterResult = asdcConnector.unregisterTopics(credential); - if (unregisterResult.getDistributionActionResult() != DistributionActionResultEnum.SUCCESS) { - log.info("client failed to unregister from topics"); - } else { - log.info("client unregistered from topics successfully"); - } - asdcConnector.close(); - - try { - cambriaIdentityManager.deleteCurrentApiKey(); - } catch (HttpException | IOException e) { - log.debug("failed to delete cambria keys", e); - } - cambriaIdentityManager.close(); - + sdcConnector.close(); isInitialized = false; isTerminated = true; - DistributionClientResultImpl stopResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "distribution client stopped successfuly"); - return stopResult; + return new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "distribution client stopped successfully"); } @Override @@ -259,167 +283,59 @@ public class DistributionClientImpl implements IDistributionClient { validateRunReady(errorWrapper); if (!errorWrapper.isEmpty()) { IDistributionClientResult result = errorWrapper.getInnerElement(); - IDistributionClientDownloadResult downloadResult = new DistributionClientDownloadResultImpl(result.getDistributionActionResult(), result.getDistributionMessageResult()); - return downloadResult; + return new DistributionClientDownloadResultImpl(result.getDistributionActionResult(), result.getDistributionMessageResult()); } - return asdcConnector.downloadArtifact(artifactInfo); + return sdcConnector.downloadArtifact(artifactInfo); } - @Override - public synchronized IDistributionClientResult init(IConfiguration conf, INotificationCallback notificationCallback, - IStatusCallback statusCallback) { - IDistributionClientResult initResult; - if (!conf.isConsumeProduceStatusTopic()) { - initResult = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG, "configuration is invalid: isConsumeProduceStatusTopic() should be set to 'true'"); - - } else if (isNull(statusCallback)) { - initResult = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG, "configuration is invalid: statusCallback is not defined"); - } else { - this.statusCallback = statusCallback; - initResult = init(conf, notificationCallback); - } - return initResult; + SdcConnectorClient createSdcConnector(Configuration configuration) { + return new SdcConnectorClient(configuration, new HttpSdcClient(configuration.getSdcAddress(), + new HttpClientFactory(configuration), + new HttpRequestFactory(configuration.getUser(), configuration.getPassword()))); } - @Override - /* - * see javadoc - */ - public synchronized IDistributionClientResult init(IConfiguration conf, INotificationCallback callback) { - - log.info("DistributionClient - init"); - - Wrapper errorWrapper = new Wrapper<>(); - validateNotInitilized(errorWrapper); - if (errorWrapper.isEmpty()) { - validateNotTerminated(errorWrapper); - } - if (errorWrapper.isEmpty()) { - this.configuration = validateAndInitConfiguration(errorWrapper, conf).getSecond(); - this.asdcConnector = createAsdcConnector(this.configuration); - } - // 1. get ueb server list from configuration - if (errorWrapper.isEmpty()) { - List servers = initUebServerList(errorWrapper); - if (servers != null) { - this.brokerServers = servers; - } - } - // 2.validate artifact types against asdc server - if (errorWrapper.isEmpty()) { - validateArtifactTypesWithAsdcServer(conf, errorWrapper); - } - // 3. create keys - if (errorWrapper.isEmpty()) { - this.callback = callback; - ApiCredential apiCredential = createUebKeys(errorWrapper); - if (apiCredential != null) { - this.credential = apiCredential; - } - } - // 4. register for topics - if (errorWrapper.isEmpty()) { - TopicRegistrationResponse topics = registerForTopics(errorWrapper, this.credential); - if (topics != null) { - this.notificationTopic = topics.getDistrNotificationTopicName(); - this.statusTopic = topics.getDistrStatusTopicName(); - this.notificationSender = createNotificationSender(); - } - } - - IDistributionClientResult result; - if (errorWrapper.isEmpty()) { - isInitialized = true; - result = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "distribution client initialized successfuly"); - } else { - result = errorWrapper.getInnerElement(); - } - - return result; - } - - SdcConnectorClient createAsdcConnector(Configuration configuration) { - return new SdcConnectorClient(configuration, new HttpAsdcClient(configuration)); - } - - private NotificationSender createNotificationSender() { - return new NotificationSender(brokerServers); - } - - private TopicRegistrationResponse registerForTopics(Wrapper errorWrapper, ApiCredential credential) { - Either registerAsdcTopics = asdcConnector.registerAsdcTopics(credential); - if (registerAsdcTopics.isRight()) { - - try { - cambriaIdentityManager.deleteCurrentApiKey(); - } catch (HttpException | IOException e) { - log.debug("failed to delete cambria keys", e); - } - errorWrapper.setInnerElement(registerAsdcTopics.right().value()); - } else { - return registerAsdcTopics.left().value(); - } - return null; - } - - private ApiCredential createUebKeys(Wrapper errorWrapper) { - ApiCredential apiCredential = null; - - initCambriaClient(errorWrapper); - if (errorWrapper.isEmpty()) { - log.debug("create keys"); - Pair uebKeys = createUebKeys(); - DistributionClientResultImpl createKeysResponse = uebKeys.getFirst(); - apiCredential = uebKeys.getSecond(); - if (createKeysResponse.getDistributionActionResult() != DistributionActionResultEnum.SUCCESS) { - errorWrapper.setInnerElement(createKeysResponse); - } - } - return apiCredential; - } - - private void validateArtifactTypesWithAsdcServer(IConfiguration conf, Wrapper errorWrapper) { - Either, IDistributionClientResult> eitherValidArtifactTypesList = asdcConnector.getValidArtifactTypesList(); + private void validateArtifactTypesWithSdcServer(IConfiguration conf, Wrapper errorWrapper) { + Either, IDistributionClientResult> eitherValidArtifactTypesList = sdcConnector.getValidArtifactTypesList(); if (eitherValidArtifactTypesList.isRight()) { DistributionActionResultEnum errorType = eitherValidArtifactTypesList.right().value().getDistributionActionResult(); - // Support the case of a new client and older ASDC Server which does not have the API - if (errorType != DistributionActionResultEnum.ASDC_NOT_FOUND) { + // Support the case of a new client and older SDC Server which does not have the API + if (errorType != DistributionActionResultEnum.SDC_NOT_FOUND) { errorWrapper.setInnerElement(eitherValidArtifactTypesList.right().value()); } } else { - final List artifactTypesFromAsdc = eitherValidArtifactTypesList.left().value(); - boolean isArtifactTypesValid = artifactTypesFromAsdc.containsAll(conf.getRelevantArtifactTypes()); + final List artifactTypesFromSdc = eitherValidArtifactTypesList.left().value(); + boolean isArtifactTypesValid = artifactTypesFromSdc.containsAll(conf.getRelevantArtifactTypes()); if (!isArtifactTypesValid) { - List invalidArtifactTypes = new ArrayList<>(); - invalidArtifactTypes.addAll(conf.getRelevantArtifactTypes()); - invalidArtifactTypes.removeAll(artifactTypesFromAsdc); + List invalidArtifactTypes = new ArrayList<>(conf.getRelevantArtifactTypes()); + invalidArtifactTypes.removeAll(artifactTypesFromSdc); DistributionClientResultImpl errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_CONTAINS_INVALID_ARTIFACT_TYPES, - "configuration contains invalid artifact types:" + invalidArtifactTypes + " valid types are:" + artifactTypesFromAsdc); + "configuration contains invalid artifact types:" + invalidArtifactTypes + " valid types are:" + artifactTypesFromSdc); errorWrapper.setInnerElement(errorResponse); } else { - log.debug("Artifact types: {} were validated with ASDC server", conf.getRelevantArtifactTypes()); + log.debug("Artifact types: {} were validated with SDC server", conf.getRelevantArtifactTypes()); } } } - private List initUebServerList(Wrapper errorWrapper) { - List brokerServers = null; - log.debug("get ueb cluster server list from component(configuration file)"); - - Either, IDistributionClientResult> serverListResponse = getUEBServerList(); - if (serverListResponse.isRight()) { - errorWrapper.setInnerElement(serverListResponse.right().value()); + private void initKafkaData(Wrapper errorWrapper) { + log.debug("Get MessageBus cluster information from SDC"); + Either kafkaData = sdcConnector.getKafkaDistData(); + if (kafkaData.isRight()) { + errorWrapper.setInnerElement(kafkaData.right().value()); } else { - brokerServers = serverListResponse.left().value(); + KafkaDataResponse kafkaDataResponse = kafkaData.left().value(); + configuration.setMsgBusAddress(Collections.singletonList(kafkaDataResponse.getKafkaBootStrapServer())); + configuration.setNotificationTopicName(kafkaDataResponse.getDistrNotificationTopicName()); + configuration.setStatusTopicName(kafkaDataResponse.getDistrStatusTopicName()); + log.debug("MessageBus cluster info retrieved successfully {}", kafkaData.left().value()); } - - return brokerServers; } private void validateNotInitilized(Wrapper errorWrapper) { if (isInitialized) { log.warn("distribution client already initialized"); - DistributionClientResultImpl alreadyInitResponse = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_ALREADY_INITIALIZED, "distribution client already initialized"); + DistributionClientResultImpl alreadyInitResponse = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_ALREADY_INITIALIZED, + "distribution client already initialized"); errorWrapper.setInnerElement(alreadyInitResponse); } } @@ -431,31 +347,17 @@ public class DistributionClientImpl implements IDistributionClient { } private IDistributionClientResult sendStatus(IDistributionStatusMessageJsonBuilder statusBuilder) { - IDistributionClientResult distributionResult; - - Either cambriaPublisher = getCambriaPublisher(statusTopic, configuration, brokerServers, credential); - if (cambriaPublisher.isRight()) { - distributionResult = cambriaPublisher.right().value(); - } else { - String statusMessage = statusBuilder.build(); - distributionResult = notificationSender.send(cambriaPublisher.left().value(), statusMessage); - } - - return distributionResult; + return notificationSender.send(configuration.getStatusTopicName(), statusBuilder.build()); } - private Either getCambriaPublisher(String statusTopic, Configuration configuration, List brokerServers, ApiCredential credential) { - CambriaBatchingPublisher cambriaPublisher = null; + private void initKafkaProducer(Wrapper errorWrapper, Configuration configuration) { try { - cambriaPublisher = new PublisherBuilder().onTopic(statusTopic).usingHttps(configuration.isUseHttpsWithDmaap()) - .usingHosts(brokerServers).build(); - cambriaPublisher.setApiCredentials(credential.getApiKey(), credential.getApiSecret()); - } catch (MalformedURLException | GeneralSecurityException e) { - Wrapper errorWrapper = new Wrapper<>(); - handleCambriaInitFailure(errorWrapper, e); - return Either.right(errorWrapper.getInnerElement()); + if (producer == null) { + producer = new SdcKafkaProducer(configuration); + } + } catch (KafkaException | IllegalStateException e) { + handleMessagingClientInitFailure(errorWrapper, e); } - return Either.left(cambriaPublisher); } @Override @@ -471,27 +373,17 @@ public class DistributionClientImpl implements IDistributionClient { if (!errorWrapper.isEmpty()) { return errorWrapper.getInnerElement(); } - IDistributionStatusMessageJsonBuilder builder = DistributionStatusMessageJsonBuilderFactory.prepareBuilderForNotificationStatus(getConfiguration().getConsumerID(), currentTimeMillis, distributionId, artifactInfo, isNotified); + IDistributionStatusMessageJsonBuilder builder = DistributionStatusMessageJsonBuilderFactory.prepareBuilderForNotificationStatus( + getConfiguration().getConsumerID(), + currentTimeMillis, + distributionId, + artifactInfo, + isNotified); return sendStatus(builder); } /* *************************** Private Methods *************************************************** */ - protected Pair createUebKeys() { - DistributionClientResultImpl response = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "keys created successfuly"); - ApiCredential credential = null; - try { - String description = String.format(DistributionClientConstants.CLIENT_DESCRIPTION, configuration.getConsumerID()); - credential = cambriaIdentityManager.createApiKey(DistributionClientConstants.EMAIL, description); - cambriaIdentityManager.setApiCredentials(credential.getApiKey(), credential.getApiSecret()); - - } catch (HttpException | CambriaApiException | IOException e) { - response = new DistributionClientResultImpl(DistributionActionResultEnum.UEB_KEYS_CREATION_FAILED, "failed to create keys: " + e.getMessage()); - log.error(response.toString()); - } - return new Pair<>(response, credential); - } - private IDistributionClientResult restartConsumer() { shutdownExecutor(); return start(); @@ -500,43 +392,36 @@ public class DistributionClientImpl implements IDistributionClient { protected Pair validateAndInitConfiguration(Wrapper errorWrapper, IConfiguration conf) { DistributionActionResultEnum result = configurationValidator.validateConfiguration(conf, statusCallback); - Configuration configuration = null; + Configuration configurationInit = null; if (result == DistributionActionResultEnum.SUCCESS) { - configuration = createConfiguration(conf); + configurationInit = createConfiguration(conf); } else { DistributionClientResultImpl initResult = new DistributionClientResultImpl(result, "configuration is invalid: " + result.name()); log.error(initResult.toString()); errorWrapper.setInnerElement(initResult); } - return new Pair<>(result, configuration); + return new Pair<>(result, configurationInit); } private Configuration createConfiguration(IConfiguration conf) { - Configuration configuration = new Configuration(conf); + Configuration configurationCreate = new Configuration(conf); if (!isPollingIntervalValid(conf.getPollingInterval())) { - configuration.setPollingInterval(DistributionClientConstants.MIN_POLLING_INTERVAL_SEC); + configurationCreate.setPollingInterval(DistributionClientConstants.MIN_POLLING_INTERVAL_SEC); } if (!isPollingTimeoutValid(conf.getPollingTimeout())) { - configuration.setPollingTimeout(DistributionClientConstants.POLLING_TIMEOUT_SEC); + configurationCreate.setPollingTimeout(DistributionClientConstants.POLLING_TIMEOUT_SEC); } if (conf.getConsumerGroup() == null) { String generatedConsumerGroup = UUID.randomUUID().toString(); - configuration.setConsumerGroup(generatedConsumerGroup); + configurationCreate.setConsumerGroup(generatedConsumerGroup); isConsumerGroupGenerated = true; } - //Default use HTTPS with SDC if (conf.isUseHttpsWithSDC() == null) { - configuration.setUseHttpsWithSDC(true); + configurationCreate.setUseHttpsWithSDC(true); } - - //Default use HTTPS with DMAAP - if (conf.isUseHttpsWithDmaap() == null) { - configuration.setUseHttpsWithDmaap(true); - } - - return configuration; + return configurationCreate; } private void shutdownExecutor() { @@ -577,7 +462,8 @@ public class DistributionClientImpl implements IDistributionClient { private void validateInitialized(Wrapper errorWrapper) { if (!isInitialized) { log.debug("client was not initialized"); - IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_NOT_INITIALIZED, "distribution client was not initialized"); + IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_NOT_INITIALIZED, + "distribution client was not initialized"); errorWrapper.setInnerElement(result); } } @@ -585,7 +471,8 @@ public class DistributionClientImpl implements IDistributionClient { private void validateNotStarted(Wrapper errorWrapper) { if (isStarted) { log.debug("client already started"); - IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_ALREADY_STARTED, "distribution client already started"); + IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_ALREADY_STARTED, + "distribution client already started"); errorWrapper.setInnerElement(result); } } @@ -593,7 +480,8 @@ public class DistributionClientImpl implements IDistributionClient { private void validateNotTerminated(Wrapper errorWrapper) { if (isTerminated) { log.debug("client was terminated"); - IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_IS_TERMINATED, "distribution client was terminated"); + IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_IS_TERMINATED, + "distribution client was terminated"); errorWrapper.setInnerElement(result); } } @@ -616,23 +504,10 @@ public class DistributionClientImpl implements IDistributionClient { return isValid; } - private synchronized void initCambriaClient(Wrapper errorWrapper) { - if (cambriaIdentityManager == null) { - try { - AbstractAuthenticatedManagerBuilder managerBuilder = new IdentityManagerBuilder().usingHosts(brokerServers); - if (configuration.isUseHttpsWithDmaap()) { - managerBuilder = managerBuilder.usingHttps(); - } - cambriaIdentityManager = managerBuilder.build(); - } catch (MalformedURLException | GeneralSecurityException e) { - handleCambriaInitFailure(errorWrapper, e); - } - } - } - private void handleCambriaInitFailure(Wrapper errorWrapper, Exception e) { - final String errorMessage = "Failed initilizing cambria component:" + e.getMessage(); - IDistributionClientResult errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.CAMBRIA_INIT_FAILED, errorMessage); + private void handleMessagingClientInitFailure(Wrapper errorWrapper, Exception e) { + final String errorMessage = "Failed initializing messaging component:" + e.getMessage(); + IDistributionClientResult errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.MESSAGING_CLIENT_INIT_FAILED, errorMessage); errorWrapper.setInnerElement(errorResponse); log.error(errorMessage); log.debug(errorMessage, e); @@ -682,8 +557,7 @@ public class DistributionClientImpl implements IDistributionClient { String vfModuleJsonString = new String(artifactPayload, StandardCharsets.UTF_8); final Type type = new TypeToken>() { }.getType(); - List vfModules = gson.fromJson(vfModuleJsonString, type); - return vfModules; + return gson.fromJson(vfModuleJsonString, type); } @@ -702,27 +576,12 @@ public class DistributionClientImpl implements IDistributionClient { } - - public Either, IDistributionClientResult> getUEBServerList() { - List msgBusAddresses = configuration.getMsgBusAddress(); - if (msgBusAddresses.isEmpty()) { - return Either.right(new DistributionClientResultImpl(DistributionActionResultEnum.CONF_MISSING_MSG_BUS_ADDRESS, "Message bus address was not found in the config file")); - } else if (getHttpProxyHost() == null && getHttpsProxyHost() == null) { - // If there is no proxy configured, convert to valid host name - return GeneralUtils.convertToValidHostName(msgBusAddresses); - } else { - // skip the IP address lookup when proxy is configured and treat all - // hosts as valid - return Either.left(msgBusAddresses); - } - } private HttpHost getHttpProxyHost() { HttpHost proxyHost = null; - if (configuration.isUseSystemProxy() && System.getProperty("http.proxyHost") != null - && System.getProperty("http.proxyPort") != null) { + if (Boolean.TRUE.equals(configuration.isUseSystemProxy() && System.getProperty("http.proxyHost") != null) && System.getProperty("http.proxyPort") != null) { proxyHost = new HttpHost(System.getProperty("http.proxyHost"), - Integer.valueOf(System.getProperty("http.proxyPort"))); + Integer.parseInt(System.getProperty("http.proxyPort"))); } else if (configuration.getHttpProxyHost() != null && configuration.getHttpProxyPort() != 0) { proxyHost = new HttpHost(configuration.getHttpProxyHost(), configuration.getHttpProxyPort()); } @@ -731,10 +590,9 @@ public class DistributionClientImpl implements IDistributionClient { private HttpHost getHttpsProxyHost() { HttpHost proxyHost = null; - if (configuration.isUseSystemProxy() && System.getProperty("https.proxyHost") != null - && System.getProperty("https.proxyPort") != null) { + if (configuration.isUseSystemProxy() && System.getProperty("https.proxyHost") != null && System.getProperty("https.proxyPort") != null) { proxyHost = new HttpHost(System.getProperty("https.proxyHost"), - Integer.valueOf(System.getProperty("https.proxyPort"))); + Integer.parseInt(System.getProperty("https.proxyPort"))); } else if (configuration.getHttpsProxyHost() != null && configuration.getHttpsProxyPort() != 0) { proxyHost = new HttpHost(configuration.getHttpsProxyHost(), configuration.getHttpsProxyPort()); } diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientResultImpl.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientResultImpl.java index 4edd355..77655ac 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientResultImpl.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientResultImpl.java @@ -25,8 +25,8 @@ import org.onap.sdc.utils.DistributionActionResultEnum; public class DistributionClientResultImpl implements IDistributionClientResult { - private DistributionActionResultEnum responseStatus; - private String responseMessage; + private final DistributionActionResultEnum responseStatus; + private final String responseMessage; public DistributionClientResultImpl(DistributionActionResultEnum responseStatus, String responseMessage) { this.responseStatus = responseStatus; diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java index bf28d97..c59612a 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java @@ -20,34 +20,32 @@ package org.onap.sdc.impl; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import java.util.ArrayList; import java.util.List; - +import org.onap.sdc.api.consumer.INotificationCallback; import org.onap.sdc.api.notification.IArtifactInfo; +import org.onap.sdc.api.notification.INotificationData; import org.onap.sdc.api.notification.IResourceInstance; import org.onap.sdc.api.results.IDistributionClientResult; -import org.onap.sdc.utils.DistributionActionResultEnum; -import org.onap.sdc.api.consumer.INotificationCallback; -import org.onap.sdc.api.notification.INotificationData; import org.onap.sdc.utils.ArtifactTypeEnum; +import org.onap.sdc.utils.DistributionActionResultEnum; +import org.onap.sdc.utils.kafka.SdcKafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.att.nsa.cambria.client.CambriaConsumer; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; - class NotificationConsumer implements Runnable { - private static Logger log = LoggerFactory.getLogger(NotificationConsumer.class.getName()); + private static final Logger log = LoggerFactory.getLogger(NotificationConsumer.class.getName()); - private CambriaConsumer cambriaConsumer; - private INotificationCallback clientCallback; - private List artifactsTypes; - private DistributionClientImpl distributionClient; + private final SdcKafkaConsumer kafkaConsumer; + private final INotificationCallback clientCallback; + private final List artifactsTypes; + private final DistributionClientImpl distributionClient; - NotificationConsumer(CambriaConsumer cambriaConsumer, INotificationCallback clientCallback, List artifactsTypes, DistributionClientImpl distributionClient) { - this.cambriaConsumer = cambriaConsumer; + NotificationConsumer(SdcKafkaConsumer kafkaConsumer, INotificationCallback clientCallback, List artifactsTypes, DistributionClientImpl distributionClient) { + this.kafkaConsumer = kafkaConsumer; this.clientCallback = clientCallback; this.artifactsTypes = artifactsTypes; this.distributionClient = distributionClient; @@ -55,16 +53,16 @@ class NotificationConsumer implements Runnable { @Override public void run() { - try { Gson gson = new GsonBuilder().setPrettyPrinting().create(); long currentTimeMillis = System.currentTimeMillis(); - for (String notificationMsg : cambriaConsumer.fetch()) { + log.info("Polling for messages from topic: {}", kafkaConsumer.getTopicName()); + for (String notificationMsg : kafkaConsumer.poll()) { log.debug("received message from topic"); - log.debug("recieved notification from broker: {}", notificationMsg); + log.debug("received notification from broker: {}", notificationMsg); - final NotificationDataImpl notificationFromUEB = gson.fromJson(notificationMsg, NotificationDataImpl.class); - NotificationDataImpl notificationForCallback = buildCallbackNotificationLogic(currentTimeMillis, notificationFromUEB); + final NotificationDataImpl notificationFromMessageBus = gson.fromJson(notificationMsg, NotificationDataImpl.class); + NotificationDataImpl notificationForCallback = buildCallbackNotificationLogic(currentTimeMillis, notificationFromMessageBus); if (isActivateCallback(notificationForCallback)) { String stringNotificationForCallback = gson.toJson(notificationForCallback); log.debug("sending notification to client: {}", stringNotificationForCallback); @@ -73,8 +71,8 @@ class NotificationConsumer implements Runnable { } } catch (Exception e) { - log.error("Error exception occured when fetching with Cambria Client:{}", e.getMessage()); - log.debug("Error exception occured when fetching with Cambria Client:{}", e.getMessage(), e); + log.error("Error exception occurred when fetching with Kafka Consumer:{}", e.getMessage()); + log.debug("Error exception occurred when fetching with Kafka Consumer:{}", e.getMessage(), e); } } @@ -85,21 +83,21 @@ class NotificationConsumer implements Runnable { return hasRelevantArtifactsInResourceInstance || hasRelevantArtifactsInService; } - protected NotificationDataImpl buildCallbackNotificationLogic(long currentTimeMillis, final NotificationDataImpl notificationFromUEB) { - List relevantResourceInstances = buildResourceInstancesLogic(notificationFromUEB, currentTimeMillis); - List relevantServiceArtifacts = handleRelevantArtifacts(notificationFromUEB, currentTimeMillis, notificationFromUEB.getServiceArtifactsImpl()); - notificationFromUEB.setResources(relevantResourceInstances); - notificationFromUEB.setServiceArtifacts(relevantServiceArtifacts); - return notificationFromUEB; + protected NotificationDataImpl buildCallbackNotificationLogic(long currentTimeMillis, final NotificationDataImpl notificationFromMessageBus) { + List relevantResourceInstances = buildResourceInstancesLogic(notificationFromMessageBus, currentTimeMillis); + List relevantServiceArtifacts = handleRelevantArtifacts(notificationFromMessageBus, currentTimeMillis, notificationFromMessageBus.getServiceArtifactsImpl()); + notificationFromMessageBus.setResources(relevantResourceInstances); + notificationFromMessageBus.setServiceArtifacts(relevantServiceArtifacts); + return notificationFromMessageBus; } - private List buildResourceInstancesLogic(NotificationDataImpl notificationFromUEB, long currentTimeMillis) { + private List buildResourceInstancesLogic(NotificationDataImpl notificationFromMessageBus, long currentTimeMillis) { List relevantResourceInstances = new ArrayList<>(); - for (JsonContainerResourceInstance resourceInstance : notificationFromUEB.getResourcesImpl()) { + for (JsonContainerResourceInstance resourceInstance : notificationFromMessageBus.getResourcesImpl()) { final List artifactsImplList = resourceInstance.getArtifactsImpl(); - List foundRelevantArtifacts = handleRelevantArtifacts(notificationFromUEB, currentTimeMillis, artifactsImplList); + List foundRelevantArtifacts = handleRelevantArtifacts(notificationFromMessageBus, currentTimeMillis, artifactsImplList); if (!foundRelevantArtifacts.isEmpty() || distributionClient.getConfiguration().isFilterInEmptyResources()) { resourceInstance.setArtifacts(foundRelevantArtifacts); relevantResourceInstances.add(resourceInstance); @@ -109,17 +107,17 @@ class NotificationConsumer implements Runnable { } - private List handleRelevantArtifacts(NotificationDataImpl notificationFromUEB, long currentTimeMillis, final List artifactsImplList) { + private List handleRelevantArtifacts(NotificationDataImpl notificationFromMessageBus, long currentTimeMillis, final List artifactsImplList) { List relevantArtifacts = new ArrayList<>(); if (artifactsImplList != null) { for (ArtifactInfoImpl artifactInfo : artifactsImplList) { - handleRelevantArtifact(notificationFromUEB, currentTimeMillis, artifactsImplList, relevantArtifacts, artifactInfo); + handleRelevantArtifact(notificationFromMessageBus, currentTimeMillis, artifactsImplList, relevantArtifacts, artifactInfo); } } return relevantArtifacts; } - private void handleRelevantArtifact(NotificationDataImpl notificationFromUEB, long currentTimeMillis, final List artifactsImplList, List relevantArtifacts, ArtifactInfoImpl artifactInfo) { + private void handleRelevantArtifact(NotificationDataImpl notificationFromMessageBus, long currentTimeMillis, final List artifactsImplList, List relevantArtifacts, ArtifactInfoImpl artifactInfo) { boolean isArtifactRelevant = artifactsTypes.contains(artifactInfo.getArtifactType()); String artifactType = artifactInfo.getArtifactType(); if (artifactInfo.getGeneratedFromUUID() != null && !artifactInfo.getGeneratedFromUUID().isEmpty()) { @@ -131,16 +129,16 @@ class NotificationConsumer implements Runnable { } } if (isArtifactRelevant) { - setRelatedArtifacts(artifactInfo, notificationFromUEB); + setRelatedArtifacts(artifactInfo, notificationFromMessageBus); if (artifactType.equals(ArtifactTypeEnum.HEAT.name()) || artifactType.equals(ArtifactTypeEnum.HEAT_VOL.name()) || artifactType.equals(ArtifactTypeEnum.HEAT_NET.name())) { setGeneratedArtifact(artifactsImplList, artifactInfo); } relevantArtifacts.add(artifactInfo); } - IDistributionClientResult notificationStatus = distributionClient.sendNotificationStatus(currentTimeMillis, notificationFromUEB.getDistributionID(), artifactInfo, isArtifactRelevant); + IDistributionClientResult notificationStatus = distributionClient.sendNotificationStatus(currentTimeMillis, notificationFromMessageBus.getDistributionID(), artifactInfo, isArtifactRelevant); if (notificationStatus.getDistributionActionResult() != DistributionActionResultEnum.SUCCESS) { - log.error("Error failed to send notification status to UEB failed status:{}, error message:{}", notificationStatus.getDistributionActionResult().name(), notificationStatus.getDistributionMessageResult()); + log.error("Error failed to send notification status to MessageBus failed status:{}, error message:{}", notificationStatus.getDistributionActionResult().name(), notificationStatus.getDistributionMessageResult()); } } diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/StatusConsumer.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/StatusConsumer.java index 5951ed0..2c69330 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/StatusConsumer.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/StatusConsumer.java @@ -20,24 +20,23 @@ package org.onap.sdc.impl; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import org.onap.sdc.api.consumer.IStatusCallback; import org.onap.sdc.api.notification.IStatusData; +import org.onap.sdc.utils.kafka.SdcKafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.att.nsa.cambria.client.CambriaConsumer; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; - class StatusConsumer implements Runnable { - private static Logger log = LoggerFactory.getLogger(StatusConsumer.class.getName()); + private static final Logger log = LoggerFactory.getLogger(StatusConsumer.class.getName()); - private CambriaConsumer cambriaConsumer; - private IStatusCallback clientCallback; + private final SdcKafkaConsumer kafkaConsumer; + private final IStatusCallback clientCallback; - StatusConsumer(CambriaConsumer cambriaConsumer, IStatusCallback clientCallback) { - this.cambriaConsumer = cambriaConsumer; + StatusConsumer(SdcKafkaConsumer kafkaConsumer, IStatusCallback clientCallback) { + this.kafkaConsumer = kafkaConsumer; this.clientCallback = clientCallback; } @@ -46,18 +45,16 @@ class StatusConsumer implements Runnable { try { Gson gson = new GsonBuilder().setPrettyPrinting().create(); - for (String statusMsg : cambriaConsumer.fetch()) { + log.info("Polling for messages from topic: {}", kafkaConsumer.getTopicName()); + for (String statusMsg : kafkaConsumer.poll()) { log.debug("received message from topic"); - log.debug("recieved notification from broker: {}", statusMsg); + log.debug("received notification from broker: {}", statusMsg); IStatusData statusData = gson.fromJson(statusMsg, StatusDataImpl.class); clientCallback.activateCallback(statusData); - - } - } catch (Exception e) { - log.error("Error exception occured when fetching with Cambria Client:{}", e.getMessage()); - log.debug("Error exception occured when fetching with Cambria Client:{}", e.getMessage(), e); + log.error("Error exception occurred when fetching with Kafka Consumer:{}", e.getMessage()); + log.debug("Error exception occurred when fetching with Kafka Consumer:{}", e.getMessage(), e); } } diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionActionResultEnum.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionActionResultEnum.java index 514630f..834751a 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionActionResultEnum.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionActionResultEnum.java @@ -36,23 +36,19 @@ public enum DistributionActionResultEnum { CONFIGURATION_IS_MISSING, CONF_MISSING_USERNAME, CONF_MISSING_PASSWORD, - CONF_MISSING_ASDC_FQDN, + CONF_MISSING_SDC_FQDN, CONF_MISSING_ARTIFACT_TYPES, CONF_CONTAINS_INVALID_ARTIFACT_TYPES, CONF_MISSING_CONSUMER_ID, CONF_MISSING_ENVIRONMENT_NAME, - CONF_MISSING_CONSUMER_GROUP, - CONF_INVALID_ASDC_FQDN, + CONF_INVALID_SDC_FQDN, CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG, - CONF_MISSING_MSG_BUS_ADDRESS, - CONF_INVALID_MSG_BUS_ADDRESS, - ASDC_AUTHENTICATION_FAILED, - ASDC_AUTHORIZATION_FAILED, - ASDC_NOT_FOUND, - ASDC_SERVER_PROBLEM, - ASDC_CONNECTION_FAILED, - ASDC_SERVER_TIMEOUT, + SDC_AUTHENTICATION_FAILED, + SDC_AUTHORIZATION_FAILED, + SDC_NOT_FOUND, + SDC_SERVER_PROBLEM, + SDC_CONNECTION_FAILED, + SDC_SERVER_TIMEOUT, - CAMBRIA_INIT_FAILED, - UEB_KEYS_CREATION_FAILED + MESSAGING_CLIENT_INIT_FAILED } diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionClientConstants.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionClientConstants.java index 8432611..f4ebbcc 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionClientConstants.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionClientConstants.java @@ -28,7 +28,7 @@ import java.util.regex.Pattern; * @author mshitrit */ public final class DistributionClientConstants { - public static final String CLIENT_DESCRIPTION = "ASDC Distribution Client Key for %s"; + public static final String CLIENT_DESCRIPTION = "SDC Distribution Client Key for %s"; public static final Pattern FQDN_PATTERN = Pattern.compile( "^" + "([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]{0,61}[a-zA-Z0-9])(\\.([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]{0,61}[a-zA-Z0-9]))*(:[0-9]{2,5})*$", Pattern.CASE_INSENSITIVE); diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/GeneralUtils.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/GeneralUtils.java index ff5d201..6a69d8b 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/GeneralUtils.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/GeneralUtils.java @@ -70,24 +70,4 @@ public class GeneralUtils { } return isEncoded; } - - - public static Either, IDistributionClientResult> convertToValidHostName(List msgBusAddresses) { - List uebLocalHostsNames = new ArrayList<>(); - for (String name : msgBusAddresses) { - try { - uebLocalHostsNames.add(InetAddress.getByName(name).getHostName()); - } catch (UnknownHostException e) { - LOGGER.debug("UnknownHost: {}", e.getMessage(), e); - } - } - Either, IDistributionClientResult> response; - if (uebLocalHostsNames.isEmpty()) { - response = Either.right(new DistributionClientResultImpl(DistributionActionResultEnum.CONF_INVALID_MSG_BUS_ADDRESS, "configuration is invalid: " + DistributionActionResultEnum.CONF_INVALID_MSG_BUS_ADDRESS.name())); - - } else { - response = Either.left(uebLocalHostsNames); - } - return response; - } } diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java index 1fb71a6..44a9ddb 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java @@ -20,55 +20,46 @@ package org.onap.sdc.utils; -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.cambria.client.CambriaPublisher; +import java.util.concurrent.TimeUnit; +import org.apache.kafka.common.KafkaException; import org.onap.sdc.api.results.IDistributionClientResult; import org.onap.sdc.impl.DistributionClientResultImpl; +import org.onap.sdc.utils.kafka.SdcKafkaProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.List; -import java.util.concurrent.TimeUnit; - public class NotificationSender { private static final Logger log = LoggerFactory.getLogger(NotificationSender.class); - private static final long PUBLISHER_CLOSING_TIMEOUT = 10L; private static final long SLEEP_TIME = 1; + private final SdcKafkaProducer producer; - private final List brokerServers; - - public NotificationSender(List brokerServers) { - this.brokerServers = brokerServers; + public NotificationSender(SdcKafkaProducer producer) { + this.producer = producer; } - public IDistributionClientResult send(CambriaBatchingPublisher publisher, String status) { + public IDistributionClientResult send(String topic, String status) { log.info("DistributionClient - sendStatus"); DistributionClientResultImpl distributionResult; try { - log.debug("Publisher server list: {}", brokerServers); - log.debug("Trying to send status: {}", status); - publisher.send("MyPartitionKey", status); + log.debug("Publisher server list: {}", producer.getMsgBusAddresses()); + log.info("Trying to send status: {} \n to topic {}", status, producer.getTopicName()); + producer.send(topic, "MyPartitionKey", status); TimeUnit.SECONDS.sleep(SLEEP_TIME); - } catch (IOException | InterruptedException e) { - log.error("DistributionClient - sendDownloadStatus. Failed to send download status", e); + } catch (KafkaException | InterruptedException e) { + log.error("DistributionClient - sendStatus. Failed to send status", e); } finally { - distributionResult = closePublisher(publisher); + distributionResult = closeProducer(); } return distributionResult; } - private DistributionClientResultImpl closePublisher(CambriaBatchingPublisher publisher) { + private DistributionClientResultImpl closeProducer() { DistributionClientResultImpl distributionResult = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "Failed to send status"); try { - List notSentMessages = publisher.close(PUBLISHER_CLOSING_TIMEOUT, TimeUnit.SECONDS); - if (notSentMessages.isEmpty()) { - distributionResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "Messages successfully sent"); - } else { - log.debug("DistributionClient - sendDownloadStatus. {} messages were not sent", notSentMessages.size()); - } - } catch (IOException | InterruptedException e) { + producer.flush(); + distributionResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "Messages successfully sent"); + } catch (KafkaException | IllegalArgumentException e) { log.error("DistributionClient - sendDownloadStatus. Failed to send messages and close publisher.", e); } return distributionResult; diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/KafkaDataResponse.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/KafkaDataResponse.java new file mode 100644 index 0000000..ac1d2ea --- /dev/null +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/KafkaDataResponse.java @@ -0,0 +1,35 @@ +/*- + * ============LICENSE_START======================================================= + * sdc-distribution-client + * ================================================================================ + * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.sdc.utils.kafka; + +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@Getter +@Setter +@NoArgsConstructor +public class KafkaDataResponse { + + private String kafkaBootStrapServer; + private String distrNotificationTopicName; + private String distrStatusTopicName; +} \ No newline at end of file diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java new file mode 100644 index 0000000..71f793d --- /dev/null +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java @@ -0,0 +1,100 @@ +/*- + * ============LICENSE_START======================================================= + * sdc-distribution-client + * ================================================================================ + * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.sdc.utils.kafka; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.errors.InvalidGroupIdException; +import org.onap.sdc.impl.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class that provides a KafkaConsumer to communicate with a kafka cluster + */ +public class SdcKafkaConsumer { + + private static final Logger log = LoggerFactory.getLogger(SdcKafkaConsumer.class); + final KafkaConsumer consumer; + private final int pollTimeout; + private String topicName; + + /** + * + * @param configuration The config provided to the client + */ + public SdcKafkaConsumer(Configuration configuration) { + Properties props = new Properties(); + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, configuration.getMsgBusAddress()); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, configuration.getKafkaSecurityProtocolConfig()); + props.put(SaslConfigs.SASL_MECHANISM, configuration.getKafkaSaslMechanism()); + props.put(SaslConfigs.SASL_JAAS_CONFIG, configuration.getKafkaSaslJaasConfig()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, configuration.getConsumerGroup()); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, configuration.getConsumerID() + "-consumer-" + UUID.randomUUID()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + consumer = new KafkaConsumer<>(props); + pollTimeout = configuration.getPollingTimeout(); + } + + /** + * + * @param topic The kafka topic to subscribe to + */ + public void subscribe(String topic) { + try { + consumer.subscribe(Collections.singleton(topic)); + this.topicName = topic; + } + catch (InvalidGroupIdException e) { + log.error("Invalid Group {}", e.getMessage()); + } + } + + /** + * + * @return The list of records returned from the poll + */ + public List poll() { + List msgs = new ArrayList<>(); + ConsumerRecords records = consumer.poll(Duration.ofSeconds(pollTimeout)); + for (ConsumerRecord rec : records) { + msgs.add(rec.value()); + } + return msgs; + } + + public String getTopicName() { + return topicName; + } +} \ No newline at end of file diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaProducer.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaProducer.java new file mode 100644 index 0000000..9826f8b --- /dev/null +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaProducer.java @@ -0,0 +1,98 @@ +/*- + * ============LICENSE_START======================================================= + * sdc-distribution-client + * ================================================================================ + * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.sdc.utils.kafka; + +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.Future; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.SaslConfigs; +import org.onap.sdc.impl.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class that provides a KafkaProducer to communicate with a kafka cluster + */ +public class SdcKafkaProducer { + + private static final Logger log = LoggerFactory.getLogger(SdcKafkaProducer.class); + final KafkaProducer producer; + private final List msgBusAddresses; + private final String topicName; + + public SdcKafkaProducer(Configuration configuration) { + Properties props = new Properties(); + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, configuration.getMsgBusAddress()); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, configuration.getKafkaSecurityProtocolConfig()); + props.put(SaslConfigs.SASL_MECHANISM, configuration.getKafkaSaslMechanism()); + props.put(SaslConfigs.SASL_JAAS_CONFIG, configuration.getKafkaSaslJaasConfig()); + props.put(ProducerConfig.CLIENT_ID_CONFIG, configuration.getConsumerID() + "-producer-" + UUID.randomUUID()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + producer = new KafkaProducer<>(props); + msgBusAddresses = configuration.getMsgBusAddress(); + topicName = configuration.getStatusTopicName(); + } + + /** + * + * @param topicName The name of the topic to publish to + * @param key The key value of the ProducerRecord + * @param value The value of the ProducerRecord + * @return The RecordMetedata of the request + */ + public Future send(String topicName, String key, String value) { + Future data; + try { + data = producer.send(new ProducerRecord<>(topicName, key, value)); + } catch (KafkaException e) { + log.error("Failed the send data: exc {}", e.getMessage()); + throw e; + } + return data; + } + /** + * + */ + public void flush() { + try { + producer.flush(); + } + catch (KafkaException e) { + log.error("Failed to send data: exc {}", e.getMessage()); + } + } + + public List getMsgBusAddresses() { + return msgBusAddresses; + } + + public String getTopicName() { + return topicName; + } +} \ No newline at end of file diff --git a/sdc-distribution-client/src/test/java/org/onap/sdc/api/asdc/RegistrationRequestTest.java b/sdc-distribution-client/src/test/java/org/onap/sdc/api/asdc/RegistrationRequestTest.java deleted file mode 100644 index 304cb56..0000000 --- a/sdc-distribution-client/src/test/java/org/onap/sdc/api/asdc/RegistrationRequestTest.java +++ /dev/null @@ -1,45 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * SDC - * ================================================================================ - * Copyright (C) 2019 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.sdc.api.asdc; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.util.Collections; -import java.util.List; -import org.junit.jupiter.api.Test; - -class RegistrationRequestTest { - - private static final List DIST_ENV_END_POINTS = Collections.emptyList(); - private static final boolean IS_CONSUMER_TO_SDC_DISTR_STATUS_TOPIC = true; - private static final String ENV_NAME = "ENV_NAME"; - private static final String API_KEY = "API_KEY"; - - @Test - void testConstructorShouldSetProperties() { - RegistrationRequest registrationRequest = - new RegistrationRequest(API_KEY, ENV_NAME, IS_CONSUMER_TO_SDC_DISTR_STATUS_TOPIC, DIST_ENV_END_POINTS); - assertEquals(API_KEY, registrationRequest.getApiPublicKey()); - assertEquals(DIST_ENV_END_POINTS, registrationRequest.getDistEnvEndPoints()); - assertEquals(ENV_NAME, registrationRequest.getDistrEnvName()); - assertTrue(registrationRequest.getIsConsumerToSdcDistrStatusTopic()); - } -} diff --git a/sdc-distribution-client/src/test/java/org/onap/sdc/http/HttpAsdcClientResponseTest.java b/sdc-distribution-client/src/test/java/org/onap/sdc/http/HttpAsdcClientResponseTest.java index 8a912d9..4ff01c5 100644 --- a/sdc-distribution-client/src/test/java/org/onap/sdc/http/HttpAsdcClientResponseTest.java +++ b/sdc-distribution-client/src/test/java/org/onap/sdc/http/HttpAsdcClientResponseTest.java @@ -33,8 +33,7 @@ import org.junit.jupiter.params.provider.MethodSource; import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) -class HttpAsdcClientResponseTest { - +class HttpSdcClientResponseTest { public static Collection data() { return Arrays.asList(new Object[][]{ {HttpStatus.SC_INTERNAL_SERVER_ERROR, "failed to send request"}, @@ -47,14 +46,14 @@ class HttpAsdcClientResponseTest { @MethodSource("data") void shouldCreateHttpResponse(int httpStatusCode, String httpMessage) throws IOException { // when - final HttpAsdcResponse response = HttpAsdcClient.createHttpResponse(httpStatusCode, httpMessage); + final HttpSdcResponse response = HttpSdcClient.createHttpResponse(httpStatusCode, httpMessage); // then assertEquals(httpStatusCode, response.getStatus()); assertEquals(httpMessage, getResponseMessage(response)); } - private String getResponseMessage(HttpAsdcResponse response) throws IOException { + private String getResponseMessage(HttpSdcResponse response) throws IOException { return IOUtils.toString(response.getMessage().getContent(), StandardCharsets.UTF_8); } -} + } diff --git a/sdc-distribution-client/src/test/java/org/onap/sdc/http/HttpAsdcClientTest.java b/sdc-distribution-client/src/test/java/org/onap/sdc/http/HttpAsdcClientTest.java index 2dcfd5d..cbd12c3 100644 --- a/sdc-distribution-client/src/test/java/org/onap/sdc/http/HttpAsdcClientTest.java +++ b/sdc-distribution-client/src/test/java/org/onap/sdc/http/HttpAsdcClientTest.java @@ -43,8 +43,7 @@ import org.onap.sdc.utils.Pair; import org.onap.sdc.utils.TestConfiguration; @ExtendWith(MockitoExtension.class) -class HttpAsdcClientTest { - +class HttpSdcClientTest { private static final String URL = "http://127.0.0.1:8080/target"; private static final int HTTP_OK = 200; private static final String K_1 = "k1"; @@ -71,14 +70,14 @@ class HttpAsdcClientTest { final HttpRequestFactory httpRequestFactory = new HttpRequestFactory( configuration.getUser(), configuration.getPassword()); - final HttpAsdcClient httpAsdcClient = new HttpAsdcClient( - configuration.getAsdcAddress(), + final HttpSdcClient httpSdcClient = new HttpSdcClient( + configuration.getSdcAddress(), new HttpClientFactory(configuration), httpRequestFactory); // then - assertNotNull(httpAsdcClient); - assertEquals(HttpClientFactory.HTTP, httpAsdcClient.getHttpSchema()); + assertNotNull(httpSdcClient); + assertEquals(HttpClientFactory.HTTP, httpSdcClient.getHttpSchema()); } @Test @@ -91,25 +90,25 @@ class HttpAsdcClientTest { final HttpRequestFactory httpRequestFactory = new HttpRequestFactory( configuration.getUser(), configuration.getPassword()); - final HttpAsdcClient httpAsdcClient = new HttpAsdcClient( - configuration.getAsdcAddress(), + final HttpSdcClient httpSdcClient = new HttpSdcClient( + configuration.getSdcAddress(), new HttpClientFactory(configuration), httpRequestFactory); // then - assertNotNull(httpAsdcClient); - assertEquals(HttpClientFactory.HTTPS, httpAsdcClient.getHttpSchema()); + assertNotNull(httpSdcClient); + assertEquals(HttpClientFactory.HTTPS, httpSdcClient.getHttpSchema()); } @Test void shouldSendGetRequestWithoutAnyError() throws IOException { // given TestConfiguration configuration = givenHttpConfiguration(); - final HttpAsdcClient httpAsdcClient = createTestObj(HttpClientFactory.HTTP, configuration, httpClient); + final HttpSdcClient httpSdcClient = createTestObj(HttpClientFactory.HTTP, configuration, httpClient); CloseableHttpResponse httpResponse = givenHttpResponse(true); // when - final HttpAsdcResponse response = httpAsdcClient.getRequest(URL, HEADERS_MAP); + final HttpSdcResponse response = httpSdcClient.getRequest(URL, HEADERS_MAP); // then assertThat(response).isNotNull(); @@ -127,11 +126,11 @@ class HttpAsdcClientTest { void shouldSendPostRequestWithoutAnyError() throws IOException { // given TestConfiguration configuration = givenHttpConfiguration(); - final HttpAsdcClient httpAsdcClient = createTestObj(HttpClientFactory.HTTP, configuration, httpClient); + final HttpSdcClient httpSdcClient = createTestObj(HttpClientFactory.HTTP, configuration, httpClient); CloseableHttpResponse httpResponse = givenHttpResponse(false); // when - final HttpAsdcResponse response = httpAsdcClient.postRequest(URL, httpEntity, HEADERS_MAP); + final HttpSdcResponse response = httpSdcClient.postRequest(URL,httpEntity, HEADERS_MAP); // then assertThat(response).isNotNull(); @@ -141,17 +140,17 @@ class HttpAsdcClientTest { } - private HttpAsdcClient createTestObj(String httpProtocol, TestConfiguration configuration, CloseableHttpClient httpClient) { + private HttpSdcClient createTestObj(String httpProtocol, TestConfiguration configuration, CloseableHttpClient httpClient) { final HttpRequestFactory httpRequestFactory = new HttpRequestFactory( configuration.getUser(), configuration.getPassword()); HttpClientFactory httpClientFactory = mock(HttpClientFactory.class); when(httpClientFactory.createInstance()).thenReturn(new Pair<>(httpProtocol, httpClient)); - final HttpAsdcClient httpAsdcClient = new HttpAsdcClient( - configuration.getAsdcAddress(), + final HttpSdcClient httpSdcClient = new HttpSdcClient( + configuration.getSdcAddress(), httpClientFactory, httpRequestFactory); - return httpAsdcClient; + return httpSdcClient; } private CloseableHttpResponse givenHttpResponse(HttpEntity httpEntity, Header[] headers, boolean includeGetAllHeaders) { diff --git a/sdc-distribution-client/src/test/java/org/onap/sdc/http/HttpClientFactoryTest.java b/sdc-distribution-client/src/test/java/org/onap/sdc/http/HttpClientFactoryTest.java index 347b7f5..2292fc4 100644 --- a/sdc-distribution-client/src/test/java/org/onap/sdc/http/HttpClientFactoryTest.java +++ b/sdc-distribution-client/src/test/java/org/onap/sdc/http/HttpClientFactoryTest.java @@ -41,7 +41,7 @@ class HttpClientFactoryTest { TestConfiguration config = spy(new TestConfiguration()); HttpClientFactory httpClientFactory = new HttpClientFactory(config); when(config.activateServerTLSAuth()).thenReturn(true); - when(config.getKeyStorePath()).thenReturn("src/test/resources/asdc-client.jks"); + when(config.getKeyStorePath()).thenReturn("src/test/resources/sdc-client.jks"); when(config.getKeyStorePassword()).thenReturn("Aa123456"); Pair client = httpClientFactory.createInstance(); SSLConnectionSocketFactory sslsf = spy(SSLConnectionSocketFactory.getSocketFactory()); @@ -74,13 +74,13 @@ class HttpClientFactoryTest { } @Test - void shouldReturnSSLConnectionError() throws HttpAsdcClientException { + void shouldReturnSSLConnectionError() throws HttpSdcClientException{ TestConfiguration config = spy(new TestConfiguration()); HttpClientFactory httpClientFactory = new HttpClientFactory(config); when(config.activateServerTLSAuth()).thenReturn(true); when(config.getKeyStorePath()).thenReturn("src/test/resources/dummy.jks"); when(config.getKeyStorePassword()).thenReturn("Aa123456"); - assertThrows(HttpAsdcClientException.class, () -> httpClientFactory.createInstance()); + assertThrows(HttpSdcClientException.class, () -> httpClientFactory.createInstance()); } } diff --git a/sdc-distribution-client/src/test/java/org/onap/sdc/http/SdcConnectorClientTest.java b/sdc-distribution-client/src/test/java/org/onap/sdc/http/SdcConnectorClientTest.java index b09de78..61b8388 100644 --- a/sdc-distribution-client/src/test/java/org/onap/sdc/http/SdcConnectorClientTest.java +++ b/sdc-distribution-client/src/test/java/org/onap/sdc/http/SdcConnectorClientTest.java @@ -22,20 +22,14 @@ package org.onap.sdc.http; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.att.nsa.apiClient.credentials.ApiCredential; import com.google.common.hash.Hashing; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @@ -43,199 +37,180 @@ import fj.data.Either; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; -import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.commons.io.IOUtils; import org.apache.http.HttpEntity; import org.apache.http.HttpStatus; import org.apache.http.client.methods.CloseableHttpResponse; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Matchers; import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; -import org.onap.sdc.api.asdc.RegistrationRequest; import org.onap.sdc.api.consumer.IConfiguration; import org.onap.sdc.api.notification.IArtifactInfo; import org.onap.sdc.api.results.IDistributionClientResult; -import org.onap.sdc.impl.DistributionClientResultImpl; import org.onap.sdc.utils.DistributionActionResultEnum; import org.onap.sdc.utils.Pair; +import org.onap.sdc.utils.kafka.KafkaDataResponse; -class SdcConnectorClientTest { +public class SdcConnectorClientTest { + + private static final Gson gson = new GsonBuilder().create(); + private static final HttpSdcClient httpClient = mock(HttpSdcClient.class); + private static final IConfiguration configuration = mock(IConfiguration.class); + private static final HttpSdcResponse httpSdcResponse = mock(HttpSdcResponse.class); + private static final Map mockHeaders = new HashMap<>(); + private static SdcConnectorClient sdcClient; - private static final String MOCK_ENV = "MockEnv"; - private static final String MOCK_API_KEY = "MockApikey"; private static final String ARTIFACT_URL = "http://127.0.0.1/artifact/url"; private static final String IT_JUST_DIDN_T_WORK = "It just didn't work"; private static final List ARTIFACT_TYPES = Arrays.asList("Service", "Resource", "VF", "VFC"); - private static final int PORT = 49512; - private static final byte[] BYTES = new byte[]{0xA, 0xB, 0xC, 0xD}; - private static Gson gson = new GsonBuilder().create(); private static final String VALID_JSON_PAYLOAD = gson.toJson(ARTIFACT_TYPES); - private static HttpAsdcClient httpClient = mock(HttpAsdcClient.class); - private static IConfiguration configuration = mock(IConfiguration.class); - private static ApiCredential apiCredential = mock(ApiCredential.class); - private static HttpAsdcResponse httpAsdcResponse = mock(HttpAsdcResponse.class); - @SuppressWarnings("unchecked") - private static Either mockResponse = - Mockito.mock(Either.class); - private static Map mockHeaders = new HashMap<>(); - private static SdcConnectorClient asdcClient; - Pair mockPair = new Pair<>(httpAsdcResponse, null); - private HttpEntity lastHttpEntity = null; + private static final String KAFKA_DATA = "{\"kafkaBootStrapServer\": \"onap-strimzi-kafka-bootstrap:9092\",\"distrNotificationTopicName\": \"SDC-DISTR-NOTIF-TOPIC-AUTO\",\"distrStatusTopicName\": \"SDC-DISTR-STATUS-TOPIC-AUTO\"}"; + private static final String VALID_KAFKA_JSON_PAYLOAD = gson.toJson(KAFKA_DATA); + private static final int PORT = 49512; + private static final byte[] BYTES = new byte[] {0xA, 0xB, 0xC, 0xD}; @BeforeAll public static void beforeClass() { - asdcClient = Mockito.spy(new SdcConnectorClient(configuration, httpClient)); - when(apiCredential.getApiKey()).thenReturn(MOCK_API_KEY); - when(httpAsdcResponse.getStatus()).thenReturn(HttpStatus.SC_OK); + sdcClient = Mockito.spy(new SdcConnectorClient(configuration, httpClient)); + when(httpSdcResponse.getStatus()).thenReturn(HttpStatus.SC_OK); - doReturn(mockHeaders).when(asdcClient).addHeadersToHttpRequest(Mockito.anyString()); - doReturn(mockResponse).when(asdcClient).parseRegistrationResponse(httpAsdcResponse); - } - - @BeforeEach - public void beforeMethod() { - Mockito.reset(configuration, httpClient); - lastHttpEntity = null; - when(configuration.getEnvironmentName()).thenReturn(MOCK_ENV); - - doAnswer(new Answer>() { - @Override - public Pair answer(InvocationOnMock invocation) throws Throwable { - lastHttpEntity = invocation.getArgument(1, HttpEntity.class); - return mockPair; - } - }).when(httpClient).postRequest(eq(AsdcUrls.POST_FOR_TOPIC_REGISTRATION), any(HttpEntity.class), eq(mockHeaders), eq(false)); + doReturn(mockHeaders).when(sdcClient).addHeadersToHttpRequest(Mockito.anyString()); } @Test - void initAndCloseTest() { + public void initAndCloseTest() { IConfiguration conf = Mockito.mock(IConfiguration.class); when(conf.getUser()).thenReturn("user"); when(conf.getPassword()).thenReturn("password"); when(conf.isUseHttpsWithSDC()).thenReturn(true); when(conf.activateServerTLSAuth()).thenReturn(false); - final HttpAsdcClient httpClient = new HttpAsdcClient(conf); + final HttpSdcClient httpClient = new HttpSdcClient(conf); SdcConnectorClient client = new SdcConnectorClient(conf, httpClient); client.close(); - - assertThrows(IllegalStateException.class, () -> { - //check if client is really closed - httpClient.getRequest(AsdcUrls.POST_FOR_TOPIC_REGISTRATION, new HashMap<>()); - }); - } @Test - void testConsumeProduceStatusTopicFalse() throws UnsupportedOperationException, IOException { - testConsumeProduceStatusTopic(false); + void getValidKafkaDataHappyScenarioTest() throws IOException { + HttpSdcResponse responseMock = mock(HttpSdcResponse.class); + CloseableHttpResponse closeableHttpResponseMock = mock(CloseableHttpResponse.class); + HttpEntity messageMock = mock(HttpEntity.class); + Pair responsePair = + new Pair<>(responseMock, closeableHttpResponseMock); + + when(responseMock.getStatus()).thenReturn(HttpStatus.SC_OK); + when(responseMock.getMessage()).thenReturn(messageMock); + when(messageMock.getContent()).thenReturn(new ByteArrayInputStream(KAFKA_DATA.getBytes())); + when(httpClient.getRequest(eq(SdcUrls.GET_KAFKA_DIST_DATA), Matchers.any(), eq(false))) + .thenReturn(responsePair); + + Either result = sdcClient.getKafkaDistData(); + assertTrue(result.isLeft()); + KafkaDataResponse kafkaDataResponse = result.left().value(); + assertEquals("SDC-DISTR-NOTIF-TOPIC-AUTO", kafkaDataResponse.getDistrNotificationTopicName()); } @Test - void testConsumeProduceStatusTopicTrue() throws UnsupportedOperationException, IOException { - testConsumeProduceStatusTopic(true); - } + void getValidKafkaDataErrorResponseScenarioTest() throws IOException { + HttpSdcResponse responseMock = mock(HttpSdcResponse.class); + HttpEntity messageMock = mock(HttpEntity.class); + Pair responsePair = new Pair<>(responseMock, null); + + when(responseMock.getStatus()).thenReturn(HttpStatus.SC_GATEWAY_TIMEOUT); + when(responseMock.getMessage()).thenReturn(messageMock); + when(messageMock.getContent()).thenReturn(new ByteArrayInputStream(IT_JUST_DIDN_T_WORK.getBytes())); + when(httpClient.getRequest(eq(SdcUrls.GET_KAFKA_DIST_DATA), Matchers.any(), eq(false))) + .thenReturn(responsePair); - private void testConsumeProduceStatusTopic(final boolean isConsumeProduceStatusFlag) throws IOException { - when(configuration.isConsumeProduceStatusTopic()).thenReturn(isConsumeProduceStatusFlag); - asdcClient.registerAsdcTopics(apiCredential); - verify(httpClient, times(1)).postRequest(eq(AsdcUrls.POST_FOR_TOPIC_REGISTRATION), any(HttpEntity.class), eq(mockHeaders), eq(false)); - assertNotNull(lastHttpEntity); - RegistrationRequest actualRegRequest - = gson.fromJson(IOUtils.toString(lastHttpEntity.getContent(), StandardCharsets.UTF_8), RegistrationRequest.class); - RegistrationRequest expectedRegRequest - = gson.fromJson(excpectedStringBody(isConsumeProduceStatusFlag), RegistrationRequest.class); - - assertEquals(expectedRegRequest.getApiPublicKey(), actualRegRequest.getApiPublicKey()); - assertEquals(expectedRegRequest.getDistrEnvName(), actualRegRequest.getDistrEnvName()); - assertEquals(expectedRegRequest.getIsConsumerToSdcDistrStatusTopic(), actualRegRequest.getIsConsumerToSdcDistrStatusTopic()); + Either result = sdcClient.getKafkaDistData(); + assertTrue(result.isRight()); + IDistributionClientResult distributionClientResult = result.right().value(); + assertEquals(DistributionActionResultEnum.SDC_SERVER_TIMEOUT, + distributionClientResult.getDistributionActionResult()); } @Test - void getValidArtifactTypesListHappyScenarioTest() throws IOException { - HttpAsdcResponse responseMock = mock(HttpAsdcResponse.class); + public void getValidArtifactTypesListHappyScenarioTest() throws IOException { + HttpSdcResponse responseMock = mock(HttpSdcResponse.class); CloseableHttpResponse closeableHttpResponseMock = mock(CloseableHttpResponse.class); HttpEntity messageMock = mock(HttpEntity.class); - Pair responsePair = + Pair responsePair = new Pair<>(responseMock, closeableHttpResponseMock); when(responseMock.getStatus()).thenReturn(HttpStatus.SC_OK); when(responseMock.getMessage()).thenReturn(messageMock); when(messageMock.getContent()).thenReturn(new ByteArrayInputStream(VALID_JSON_PAYLOAD.getBytes())); - when(httpClient.getRequest(eq(AsdcUrls.GET_VALID_ARTIFACT_TYPES), any(), eq(false))) + when(httpClient.getRequest(eq(SdcUrls.GET_VALID_ARTIFACT_TYPES), Matchers.any(), eq(false))) .thenReturn(responsePair); - Either, IDistributionClientResult> result = asdcClient.getValidArtifactTypesList(); + Either, IDistributionClientResult> result = sdcClient.getValidArtifactTypesList(); assertTrue(result.isLeft()); List list = result.left().value(); assertEquals(ARTIFACT_TYPES, list); } @Test - void getValidArtifactTypesListErrorResponseScenarioTest() throws IOException { - HttpAsdcResponse responseMock = mock(HttpAsdcResponse.class); + public void getValidArtifactTypesListErrorResponseScenarioTest() throws IOException { + HttpSdcResponse responseMock = mock(HttpSdcResponse.class); HttpEntity messageMock = mock(HttpEntity.class); - Pair responsePair = new Pair<>(responseMock, null); + Pair responsePair = new Pair<>(responseMock, null); when(responseMock.getStatus()).thenReturn(HttpStatus.SC_GATEWAY_TIMEOUT); when(responseMock.getMessage()).thenReturn(messageMock); when(messageMock.getContent()).thenReturn(new ByteArrayInputStream(IT_JUST_DIDN_T_WORK.getBytes())); - when(httpClient.getRequest(eq(AsdcUrls.GET_VALID_ARTIFACT_TYPES), any(), eq(false))) + when(httpClient.getRequest(eq(SdcUrls.GET_VALID_ARTIFACT_TYPES), Matchers.any(), eq(false))) .thenReturn(responsePair); - Either, IDistributionClientResult> result = asdcClient.getValidArtifactTypesList(); + Either, IDistributionClientResult> result = sdcClient.getValidArtifactTypesList(); assertTrue(result.isRight()); IDistributionClientResult distributionClientResult = result.right().value(); - assertEquals(DistributionActionResultEnum.ASDC_SERVER_TIMEOUT, + assertEquals(DistributionActionResultEnum.SDC_SERVER_TIMEOUT, distributionClientResult.getDistributionActionResult()); } @Test - void getValidArtifactTypesListExceptionDuringConnectionClosingTest() throws IOException { - HttpAsdcResponse responseMock = mock(HttpAsdcResponse.class); + public void getValidArtifactTypesListExceptionDuringConnectionClosingTest() throws IOException { + HttpSdcResponse responseMock = mock(HttpSdcResponse.class); CloseableHttpResponse closeableHttpResponseMock = mock(CloseableHttpResponse.class); HttpEntity messageMock = mock(HttpEntity.class); - Pair responsePair = + Pair responsePair = new Pair<>(responseMock, closeableHttpResponseMock); when(responseMock.getStatus()).thenReturn(HttpStatus.SC_GATEWAY_TIMEOUT); when(responseMock.getMessage()).thenReturn(messageMock); when(messageMock.getContent()).thenReturn(new ByteArrayInputStream(VALID_JSON_PAYLOAD.getBytes())); - when(httpClient.getRequest(eq(AsdcUrls.GET_VALID_ARTIFACT_TYPES), any(), eq(false))) + when(httpClient.getRequest(eq(SdcUrls.GET_VALID_ARTIFACT_TYPES), Matchers.any(), eq(false))) .thenReturn(responsePair); doThrow(new IOException("Test exception")).when(closeableHttpResponseMock).close(); - Either, IDistributionClientResult> result = asdcClient.getValidArtifactTypesList(); + Either, IDistributionClientResult> result = sdcClient.getValidArtifactTypesList(); assertTrue(result.isRight()); IDistributionClientResult distributionClientResult = result.right().value(); - assertEquals(DistributionActionResultEnum.ASDC_SERVER_TIMEOUT, + assertEquals(DistributionActionResultEnum.SDC_SERVER_TIMEOUT, distributionClientResult.getDistributionActionResult()); } @Test - void getValidArtifactTypesListParsingExceptionHandlingTest() throws IOException { - HttpAsdcResponse responseMock = mock(HttpAsdcResponse.class); + public void getValidArtifactTypesListParsingExceptionHandlingTest() throws IOException { + HttpSdcResponse responseMock = mock(HttpSdcResponse.class); CloseableHttpResponse closeableHttpResponseMock = mock(CloseableHttpResponse.class); HttpEntity messageMock = mock(HttpEntity.class); - Pair responsePair = + Pair responsePair = new Pair<>(responseMock, closeableHttpResponseMock); when(responseMock.getStatus()).thenReturn(HttpStatus.SC_OK); when(responseMock.getMessage()).thenReturn(messageMock); when(messageMock.getContent()).thenReturn(new ThrowingInputStreamForTesting()); - when(httpClient.getRequest(eq(AsdcUrls.GET_VALID_ARTIFACT_TYPES), any(), eq(false))) + when(httpClient.getRequest(eq(SdcUrls.GET_VALID_ARTIFACT_TYPES), Matchers.any(), eq(false))) .thenReturn(responsePair); - Either, IDistributionClientResult> result = asdcClient.getValidArtifactTypesList(); + Either, IDistributionClientResult> result = sdcClient.getValidArtifactTypesList(); assertTrue(result.isRight()); IDistributionClientResult distributionClientResult = result.right().value(); assertEquals(DistributionActionResultEnum.GENERAL_ERROR, @@ -243,59 +218,17 @@ class SdcConnectorClientTest { } @Test - void unregisterTopicsErrorDuringProcessingTest() throws IOException { - when(configuration.getAsdcAddress()).thenReturn("127.0.0.1" + PORT); - when(configuration.isConsumeProduceStatusTopic()).thenReturn(false); - when(configuration.getMsgBusAddress()) - .thenReturn(Arrays.asList("http://127.0.0.1:45321/dmaap", "http://127.0.0.1:45321/dmaap")); - - String failMessage = "It just didn't work"; - HttpAsdcResponse responseMock = mock(HttpAsdcResponse.class); - HttpEntity messageMock = mock(HttpEntity.class); - Pair responsePair = new Pair<>(responseMock, null); - - when(responseMock.getStatus()).thenReturn(HttpStatus.SC_BAD_GATEWAY); - when(responseMock.getMessage()).thenReturn(messageMock); - when(messageMock.getContent()).thenReturn(new ByteArrayInputStream(failMessage.getBytes())); - doReturn(responsePair).when(httpClient) - .postRequest(eq(AsdcUrls.POST_FOR_UNREGISTER), any(HttpEntity.class), any(), eq(false)); - - IDistributionClientResult result = asdcClient.unregisterTopics(apiCredential); - assertEquals(DistributionActionResultEnum.ASDC_CONNECTION_FAILED, result.getDistributionActionResult()); - } - - @Test - void unregisterTopicsHappyScenarioTest() throws IOException { - when(configuration.getAsdcAddress()).thenReturn("127.0.0.1" + PORT); - when(configuration.isConsumeProduceStatusTopic()).thenReturn(false); - - String failMessage = ""; - HttpAsdcResponse responseMock = mock(HttpAsdcResponse.class); - HttpEntity messageMock = mock(HttpEntity.class); - Pair responsePair = new Pair<>(responseMock, null); - - when(responseMock.getStatus()).thenReturn(HttpStatus.SC_NO_CONTENT); - when(responseMock.getMessage()).thenReturn(messageMock); - when(messageMock.getContent()).thenReturn(new ByteArrayInputStream(failMessage.getBytes())); - doReturn(responsePair).when(httpClient) - .postRequest(eq(AsdcUrls.POST_FOR_UNREGISTER), any(HttpEntity.class), any(), eq(false)); - - IDistributionClientResult result = asdcClient.unregisterTopics(apiCredential); - assertEquals(DistributionActionResultEnum.SUCCESS, result.getDistributionActionResult()); - } - - @Test - void downloadArtifactHappyScenarioTest() throws IOException { + public void downloadArtifactHappyScenarioTest() throws IOException { Map headers = new HashMap<>(); - headers.put(asdcClient.CONTENT_DISPOSITION_HEADER, "SomeHeader"); + headers.put(SdcConnectorClient.CONTENT_DISPOSITION_HEADER, "SomeHeader"); IArtifactInfo artifactInfo = mock(IArtifactInfo.class); when(artifactInfo.getArtifactURL()).thenReturn(ARTIFACT_URL); when(artifactInfo.getArtifactChecksum()).thenReturn(Hashing.md5().hashBytes(BYTES).toString()); - HttpAsdcResponse responseMock = mock(HttpAsdcResponse.class); + HttpSdcResponse responseMock = mock(HttpSdcResponse.class); HttpEntity messageMock = mock(HttpEntity.class); - Pair responsePair = new Pair<>(responseMock, null); + Pair responsePair = new Pair<>(responseMock, null); when(responseMock.getStatus()).thenReturn(HttpStatus.SC_OK); when(responseMock.getMessage()).thenReturn(messageMock); @@ -303,70 +236,62 @@ class SdcConnectorClientTest { when(messageMock.getContent()).thenReturn(new ByteArrayInputStream(BYTES)); doReturn(responsePair).when(httpClient).getRequest(eq(ARTIFACT_URL), any(), eq(false)); - IDistributionClientResult result = asdcClient.downloadArtifact(artifactInfo); + IDistributionClientResult result = sdcClient.downloadArtifact(artifactInfo); assertEquals(DistributionActionResultEnum.SUCCESS, result.getDistributionActionResult()); } @Test - void downloadArtifactDataIntegrityProblemTest() throws IOException { + public void downloadArtifactDataIntegrityProblemTest() throws IOException { IArtifactInfo artifactInfo = mock(IArtifactInfo.class); when(artifactInfo.getArtifactURL()).thenReturn(ARTIFACT_URL); - HttpAsdcResponse responseMock = mock(HttpAsdcResponse.class); + HttpSdcResponse responseMock = mock(HttpSdcResponse.class); HttpEntity messageMock = mock(HttpEntity.class); - Pair responsePair = new Pair<>(responseMock, null); + Pair responsePair = new Pair<>(responseMock, null); when(responseMock.getStatus()).thenReturn(HttpStatus.SC_OK); when(responseMock.getMessage()).thenReturn(messageMock); when(messageMock.getContent()).thenReturn(new ByteArrayInputStream(BYTES)); doReturn(responsePair).when(httpClient).getRequest(eq(ARTIFACT_URL), any(), eq(false)); - IDistributionClientResult result = asdcClient.downloadArtifact(artifactInfo); + IDistributionClientResult result = sdcClient.downloadArtifact(artifactInfo); assertEquals(DistributionActionResultEnum.DATA_INTEGRITY_PROBLEM, result.getDistributionActionResult()); } @Test - void downloadArtifactExceptionDuringDownloadHandlingTest() throws IOException { + public void downloadArtifactExceptionDuringDownloadHandlingTest() throws IOException { IArtifactInfo artifactInfo = mock(IArtifactInfo.class); when(artifactInfo.getArtifactURL()).thenReturn(ARTIFACT_URL); - HttpAsdcResponse responseMock = mock(HttpAsdcResponse.class); + HttpSdcResponse responseMock = mock(HttpSdcResponse.class); HttpEntity messageMock = mock(HttpEntity.class); - Pair responsePair = new Pair<>(responseMock, null); + Pair responsePair = new Pair<>(responseMock, null); when(responseMock.getStatus()).thenReturn(HttpStatus.SC_OK); when(responseMock.getMessage()).thenReturn(messageMock); when(messageMock.getContent()).thenReturn(new ThrowingInputStreamForTesting()); doReturn(responsePair).when(httpClient).getRequest(eq(ARTIFACT_URL), any(), eq(false)); - IDistributionClientResult result = asdcClient.downloadArtifact(artifactInfo); + IDistributionClientResult result = sdcClient.downloadArtifact(artifactInfo); assertEquals(DistributionActionResultEnum.GENERAL_ERROR, result.getDistributionActionResult()); } @Test - void downloadArtifactHandleDownloadErrorTest() throws IOException { + public void downloadArtifactHandleDownloadErrorTest() throws IOException { IArtifactInfo artifactInfo = mock(IArtifactInfo.class); when(artifactInfo.getArtifactURL()).thenReturn(ARTIFACT_URL); - HttpAsdcResponse responseMock = mock(HttpAsdcResponse.class); + HttpSdcResponse responseMock = mock(HttpSdcResponse.class); HttpEntity messageMock = mock(HttpEntity.class); - Pair responsePair = new Pair<>(responseMock, null); + Pair responsePair = new Pair<>(responseMock, null); when(responseMock.getStatus()).thenReturn(HttpStatus.SC_INTERNAL_SERVER_ERROR); when(responseMock.getMessage()).thenReturn(messageMock); when(messageMock.getContent()).thenReturn(new ThrowingInputStreamForTesting()); doReturn(responsePair).when(httpClient).getRequest(eq(ARTIFACT_URL), any(), eq(false)); - IDistributionClientResult result = asdcClient.downloadArtifact(artifactInfo); - assertEquals(DistributionActionResultEnum.ASDC_SERVER_PROBLEM, result.getDistributionActionResult()); - } - - private String excpectedStringBody(boolean isConsumeProduceStatusTopic) { - String stringBodyTemplate = - "{\r\n" + " \"apiPublicKey\": \"MockApikey\",\r\n" + " \"distrEnvName\": \"MockEnv\",\r\n" - + " \"isConsumerToSdcDistrStatusTopic\": %s\r\n" + "}"; - return String.format(stringBodyTemplate, isConsumeProduceStatusTopic); - + IDistributionClientResult result = sdcClient.downloadArtifact(artifactInfo); + assertEquals(DistributionActionResultEnum.SDC_SERVER_PROBLEM, result.getDistributionActionResult()); } static class ThrowingInputStreamForTesting extends InputStream { @@ -376,4 +301,4 @@ class SdcConnectorClientTest { throw new IOException("Not implemented. This is expected as the implementation is for unit tests only."); } } -} +} \ No newline at end of file diff --git a/sdc-distribution-client/src/test/java/org/onap/sdc/impl/DistributionClientTest.java b/sdc-distribution-client/src/test/java/org/onap/sdc/impl/DistributionClientTest.java index 0297918..7354c6c 100644 --- a/sdc-distribution-client/src/test/java/org/onap/sdc/impl/DistributionClientTest.java +++ b/sdc-distribution-client/src/test/java/org/onap/sdc/impl/DistributionClientTest.java @@ -22,29 +22,29 @@ package org.onap.sdc.impl; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; -import com.att.nsa.apiClient.credentials.ApiCredential; -import com.att.nsa.apiClient.http.HttpException; -import com.att.nsa.cambria.client.CambriaClient.CambriaApiException; -import com.att.nsa.cambria.client.CambriaIdentityManager; import fj.data.Either; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Assertions; +import org.junitpioneer.jupiter.SetEnvironmentVariable; import org.mockito.Mockito; import org.onap.sdc.api.IDistributionClient; import org.onap.sdc.api.consumer.IConfiguration; import org.onap.sdc.api.notification.IArtifactInfo; import org.onap.sdc.api.notification.IVfModuleMetadata; import org.onap.sdc.api.results.IDistributionClientResult; -import org.onap.sdc.http.HttpAsdcClient; +import org.onap.sdc.http.HttpSdcClient; import org.onap.sdc.http.SdcConnectorClient; -import org.onap.sdc.http.TopicRegistrationResponse; import org.onap.sdc.utils.ArtifactTypeEnum; import org.onap.sdc.utils.ArtifactsUtils; import org.onap.sdc.utils.DistributionActionResultEnum; @@ -52,13 +52,14 @@ import org.onap.sdc.utils.Pair; import org.onap.sdc.utils.TestConfiguration; import org.onap.sdc.utils.TestNotificationCallback; import org.onap.sdc.utils.Wrapper; +import org.onap.sdc.utils.kafka.KafkaDataResponse; +import org.onap.sdc.utils.kafka.SdcKafkaConsumer; class DistributionClientTest { - - static CambriaIdentityManager cc; DistributionClientImpl client = Mockito.spy(new DistributionClientImpl()); IConfiguration testConfiguration = new TestConfiguration(); SdcConnectorClient connector = Mockito.mock(SdcConnectorClient.class); + SdcKafkaConsumer consumer = mock(SdcKafkaConsumer.class); @AfterEach @@ -68,8 +69,8 @@ class DistributionClientTest { @Test void validateConfigurationTest() { - final Pair distributionActionResultEnumConfigurationPair = client.validateAndInitConfiguration( - new Wrapper(), testConfiguration); + final Pair distributionActionResultEnumConfigurationPair = client.validateAndInitConfiguration( + new Wrapper<>(), testConfiguration); DistributionActionResultEnum validationResult = distributionActionResultEnumConfigurationPair.getFirst(); Configuration configuration = distributionActionResultEnumConfigurationPair.getSecond(); assertEquals(DistributionActionResultEnum.SUCCESS, validationResult); @@ -82,7 +83,7 @@ class DistributionClientTest { TestConfiguration userConfig = new TestConfiguration(); userConfig.setPollingInterval(1); userConfig.setPollingTimeout(2); - final Pair distributionActionResultEnumConfigurationPair = client.validateAndInitConfiguration( + final Pair distributionActionResultEnumConfigurationPair = client.validateAndInitConfiguration( new Wrapper<>(), userConfig); DistributionActionResultEnum validationResult = distributionActionResultEnumConfigurationPair.getFirst(); Configuration configuration = distributionActionResultEnumConfigurationPair.getSecond(); @@ -120,23 +121,8 @@ class DistributionClientTest { } @Test - void initWithMocksBadConfigurationTest() throws HttpException, CambriaApiException, IOException { - - TopicRegistrationResponse topics = new TopicRegistrationResponse(); - topics.setDistrNotificationTopicName("notificationTopic"); - topics.setDistrStatusTopicName("statusTopic"); - Either topicsResult = Either.left(topics); - Mockito.when(connector.registerAsdcTopics(Mockito.any(ApiCredential.class))).thenReturn(topicsResult); - - reconfigureAsdcConnector(connector, client); - - // cambriaMock - - CambriaIdentityManager cambriaMock = Mockito.mock(CambriaIdentityManager.class); - Mockito.when(cambriaMock.createApiKey(Mockito.any(String.class), Mockito.any(String.class))) - .thenReturn(new ApiCredential("public", "secret")); - client.cambriaIdentityManager = cambriaMock; - + public void initWithMocksBadConfigurationTest() { + reconfigureSdcConnector(connector, client); // no password TestConfiguration testPassword = new TestConfiguration(); testPassword.setPassword(null); @@ -157,19 +143,19 @@ class DistributionClientTest { validationResult = client.init(testUser, new TestNotificationCallback()); assertEquals(DistributionActionResultEnum.CONF_MISSING_USERNAME, validationResult.getDistributionActionResult()); - // no ASDC server fqdn + // no SDC server fqdn TestConfiguration testServerFqdn = new TestConfiguration(); - testServerFqdn.setAsdcAddress(null); + testServerFqdn.setSdcAddress(null); validationResult = client.init(testServerFqdn, new TestNotificationCallback()); - assertEquals(DistributionActionResultEnum.CONF_MISSING_ASDC_FQDN, validationResult.getDistributionActionResult()); + assertEquals(DistributionActionResultEnum.CONF_MISSING_SDC_FQDN, validationResult.getDistributionActionResult()); - testServerFqdn.setAsdcAddress(""); + testServerFqdn.setSdcAddress(""); validationResult = client.init(testServerFqdn, new TestNotificationCallback()); - assertEquals(DistributionActionResultEnum.CONF_MISSING_ASDC_FQDN, validationResult.getDistributionActionResult()); + assertEquals(DistributionActionResultEnum.CONF_MISSING_SDC_FQDN, validationResult.getDistributionActionResult()); - testServerFqdn.setAsdcAddress("this##is##bad##fqdn"); + testServerFqdn.setSdcAddress("this##is##bad##fqdn"); validationResult = client.init(testServerFqdn, new TestNotificationCallback()); - assertEquals(DistributionActionResultEnum.CONF_INVALID_ASDC_FQDN, validationResult.getDistributionActionResult()); + assertEquals(DistributionActionResultEnum.CONF_INVALID_SDC_FQDN, validationResult.getDistributionActionResult()); // no consumerId TestConfiguration testConsumerId = new TestConfiguration(); @@ -191,93 +177,63 @@ class DistributionClientTest { validationResult = client.init(testEnv, new TestNotificationCallback()); assertEquals(DistributionActionResultEnum.CONF_MISSING_ENVIRONMENT_NAME, validationResult.getDistributionActionResult()); - Mockito.verify(client, Mockito.times(0)).getUEBServerList(); - Mockito.verify(cambriaMock, Mockito.times(0)).createApiKey(Mockito.anyString(), Mockito.anyString()); - Mockito.verify(connector, Mockito.times(0)).registerAsdcTopics(Mockito.any(ApiCredential.class)); } - private void reconfigureAsdcConnector(SdcConnectorClient connector, DistributionClientImpl client) { - doReturn(connector).when(client).createAsdcConnector(any()); + private void reconfigureSdcConnector(SdcConnectorClient connector, DistributionClientImpl client) { + doReturn(connector).when(client).createSdcConnector(any()); } @Test - void initFailedConnectAsdcTest() throws HttpException, CambriaApiException, IOException { - // cambriaMock - - CambriaIdentityManager cambriaMock = Mockito.mock(CambriaIdentityManager.class); - Mockito.when(cambriaMock.createApiKey(Mockito.any(String.class), Mockito.any(String.class))) - .thenReturn(new ApiCredential("public", "secret")); - client.cambriaIdentityManager = cambriaMock; + public void initFailedConnectSdcTest() { - TestConfiguration badAsdcConfig = new TestConfiguration(); - if (badAsdcConfig.isUseHttpsWithSDC() == null) { + TestConfiguration badSdcConfig = new TestConfiguration(); + if (badSdcConfig.isUseHttpsWithSDC() == null) { System.out.println("null for HTTPS then TRUE"); } else { - System.out.println("isUseHttpsWithSDC set to " + badAsdcConfig.isUseHttpsWithSDC()); + System.out.println("isUseHttpsWithSDC set to " + badSdcConfig.isUseHttpsWithSDC()); } - badAsdcConfig.setAsdcAddress("badhost:8080"); + badSdcConfig.setSdcAddress("badhost:8080"); - IDistributionClientResult init = client.init(badAsdcConfig, new TestNotificationCallback()); - assertEquals(DistributionActionResultEnum.ASDC_CONNECTION_FAILED, init.getDistributionActionResult()); + IDistributionClientResult init = client.init(badSdcConfig, new TestNotificationCallback()); + assertEquals(DistributionActionResultEnum.SDC_CONNECTION_FAILED, init.getDistributionActionResult()); - badAsdcConfig = new TestConfiguration(); - badAsdcConfig.setAsdcAddress("localhost:8181"); + badSdcConfig = new TestConfiguration(); + badSdcConfig.setSdcAddress("localhost:8181"); - init = client.init(badAsdcConfig, new TestNotificationCallback()); - assertEquals(DistributionActionResultEnum.ASDC_CONNECTION_FAILED, init.getDistributionActionResult()); + init = client.init(badSdcConfig, new TestNotificationCallback()); + assertEquals(DistributionActionResultEnum.SDC_CONNECTION_FAILED, init.getDistributionActionResult()); } @Test - void initFailedConnectAsdcInHttpTest() throws HttpException, CambriaApiException, IOException { - // cambriaMock + public void initFailedConnectSdcInHttpTest() { - CambriaIdentityManager cambriaMock = Mockito.mock(CambriaIdentityManager.class); - Mockito.when(cambriaMock.createApiKey(Mockito.any(String.class), Mockito.any(String.class))) - .thenReturn(new ApiCredential("public", "secret")); - client.cambriaIdentityManager = cambriaMock; + TestConfiguration badSdcConfig = new TestConfiguration(); + badSdcConfig.setSdcAddress("badhost:8080"); + badSdcConfig.setUseHttpsWithSDC(false); - TestConfiguration badAsdcConfig = new TestConfiguration(); - badAsdcConfig.setAsdcAddress("badhost:8080"); - badAsdcConfig.setUseHttpsWithSDC(false); + IDistributionClientResult init = client.init(badSdcConfig, new TestNotificationCallback()); + assertEquals(DistributionActionResultEnum.SDC_CONNECTION_FAILED, init.getDistributionActionResult()); - IDistributionClientResult init = client.init(badAsdcConfig, new TestNotificationCallback()); - assertEquals(DistributionActionResultEnum.ASDC_CONNECTION_FAILED, init.getDistributionActionResult()); + badSdcConfig = new TestConfiguration(); + badSdcConfig.setSdcAddress("localhost:8181"); + badSdcConfig.setUseHttpsWithSDC(false); - badAsdcConfig = new TestConfiguration(); - badAsdcConfig.setAsdcAddress("localhost:8181"); - badAsdcConfig.setUseHttpsWithSDC(false); - - init = client.init(badAsdcConfig, new TestNotificationCallback()); - assertEquals(DistributionActionResultEnum.ASDC_CONNECTION_FAILED, init.getDistributionActionResult()); + init = client.init(badSdcConfig, new TestNotificationCallback()); + assertEquals(DistributionActionResultEnum.SDC_CONNECTION_FAILED, init.getDistributionActionResult()); } @Test - void getConfigurationTest() throws HttpException, CambriaApiException, IOException { + public void getConfigurationTest() { // connectorMock mockArtifactTypeList(); - TopicRegistrationResponse topics = new TopicRegistrationResponse(); - topics.setDistrNotificationTopicName("notificationTopic"); - topics.setDistrStatusTopicName("statusTopic"); - Either topicsResult = Either.left(topics); - Mockito.when(connector.registerAsdcTopics(Mockito.any(ApiCredential.class))).thenReturn(topicsResult); - IDistributionClientResult success = initSuccesResult(); - Mockito.when(connector.unregisterTopics(Mockito.any(ApiCredential.class))).thenReturn(success); - - reconfigureAsdcConnector(connector, client); + mockKafkaData(); + reconfigureSdcConnector(connector, client); + TestConfiguration badSdcConfig = new TestConfiguration(); + badSdcConfig.setPollingInterval(-5); - // cambriaMock - - CambriaIdentityManager cambriaMock = Mockito.mock(CambriaIdentityManager.class); - Mockito.when(cambriaMock.createApiKey(Mockito.any(String.class), Mockito.any(String.class))) - .thenReturn(new ApiCredential("public", "secret")); - client.cambriaIdentityManager = cambriaMock; - - TestConfiguration badAsdcConfig = new TestConfiguration(); - badAsdcConfig.setPollingInterval(-5); - - IDistributionClientResult init = client.init(badAsdcConfig, new TestNotificationCallback()); + IDistributionClientResult init = client.init(badSdcConfig, new TestNotificationCallback()); assertEquals(DistributionActionResultEnum.SUCCESS, init.getDistributionActionResult()); String confString = client.getConfiguration().toString(); @@ -301,32 +257,12 @@ class DistributionClientTest { } @Test - void initWithMocksTest() throws HttpException, CambriaApiException, IOException { - + public void initWithMocksTest() { mockArtifactTypeList(); - - TopicRegistrationResponse topics = new TopicRegistrationResponse(); - topics.setDistrNotificationTopicName("notificationTopic"); - topics.setDistrStatusTopicName("statusTopic"); - Either topicsResult = Either.left(topics); - Mockito.when(connector.registerAsdcTopics(Mockito.any(ApiCredential.class))).thenReturn(topicsResult); - IDistributionClientResult success = initSuccesResult(); - Mockito.when(connector.unregisterTopics(Mockito.any(ApiCredential.class))).thenReturn(success); - - reconfigureAsdcConnector(connector, client); - - // cambriaMock - - CambriaIdentityManager cambriaMock = Mockito.mock(CambriaIdentityManager.class); - Mockito.when(cambriaMock.createApiKey(Mockito.any(String.class), Mockito.any(String.class))) - .thenReturn(new ApiCredential("public", "secret")); - client.cambriaIdentityManager = cambriaMock; - + mockKafkaData(); + reconfigureSdcConnector(connector, client); IDistributionClientResult initResponse = client.init(testConfiguration, new TestNotificationCallback()); assertEquals(DistributionActionResultEnum.SUCCESS, initResponse.getDistributionActionResult()); - Mockito.verify(client, Mockito.times(1)).getUEBServerList(); - Mockito.verify(cambriaMock, Mockito.times(1)).createApiKey(Mockito.anyString(), Mockito.anyString()); - Mockito.verify(connector, Mockito.times(1)).registerAsdcTopics(Mockito.any(ApiCredential.class)); System.out.println(initResponse); } @@ -340,137 +276,31 @@ class DistributionClientTest { Mockito.when(connector.getValidArtifactTypesList()).thenReturn(eitherArtifactTypes); } - @Test - void testAlreadyInitTest() throws HttpException, CambriaApiException, IOException { - initWithMocksTest(); - IDistributionClientResult initResponse = client.init(testConfiguration, new TestNotificationCallback()); - assertEquals(DistributionActionResultEnum.DISTRIBUTION_CLIENT_ALREADY_INITIALIZED, initResponse.getDistributionActionResult()); + private void mockKafkaData() { + KafkaDataResponse kafkaData = new KafkaDataResponse(); + kafkaData.setKafkaBootStrapServer("localhost:9092"); + kafkaData.setDistrNotificationTopicName("SDC-DISTR-NOTIF-TOPIC-AUTO"); + kafkaData.setDistrStatusTopicName("SDC-DISTR-STATUS-TOPIC-AUTO"); + final Either eitherArtifactTypes = Either.left(kafkaData); + Mockito.when(connector.getKafkaDistData()).thenReturn(eitherArtifactTypes); } @Test - void initGetServerFailedTest() throws HttpException, CambriaApiException, IOException { - - // connectorMock - IDistributionClientResult getServersResult = new DistributionClientResultImpl(DistributionActionResultEnum.ASDC_SERVER_PROBLEM, "problem"); - Either, IDistributionClientResult> serversResult = Either.right(getServersResult); - doReturn(serversResult).when(client).getUEBServerList(); - - TopicRegistrationResponse topics = new TopicRegistrationResponse(); - topics.setDistrNotificationTopicName("notificationTopic"); - topics.setDistrStatusTopicName("statusTopic"); - Either topicsResult = Either.left(topics); - Mockito.when(connector.registerAsdcTopics(Mockito.any(ApiCredential.class))).thenReturn(topicsResult); - - reconfigureAsdcConnector(connector, client); - - // cambriaMock - - CambriaIdentityManager cambriaMock = Mockito.mock(CambriaIdentityManager.class); - Mockito.when(cambriaMock.createApiKey(Mockito.any(String.class), Mockito.any(String.class))) - .thenReturn(new ApiCredential("public", "secret")); - client.cambriaIdentityManager = cambriaMock; - + void testAlreadyInitTest() { + initWithMocksTest(); IDistributionClientResult initResponse = client.init(testConfiguration, new TestNotificationCallback()); - assertEquals(DistributionActionResultEnum.ASDC_SERVER_PROBLEM, initResponse.getDistributionActionResult()); - - Mockito.verify(client, Mockito.times(1)).getUEBServerList(); - Mockito.verify(cambriaMock, Mockito.times(0)).createApiKey(Mockito.anyString(), Mockito.anyString()); - Mockito.verify(connector, Mockito.times(0)).registerAsdcTopics(Mockito.any(ApiCredential.class)); - - System.out.println(initResponse); - } - - @Test - void initCreateKeysFailedTest() throws HttpException, CambriaApiException, IOException { - - // connectorMock - mockArtifactTypeList(); - - TopicRegistrationResponse topics = new TopicRegistrationResponse(); - topics.setDistrNotificationTopicName("notificationTopic"); - topics.setDistrStatusTopicName("statusTopic"); - Either topicsResult = Either.left(topics); - Mockito.when(connector.registerAsdcTopics(Mockito.any(ApiCredential.class))).thenReturn(topicsResult); - - reconfigureAsdcConnector(connector, client); - - // cambriaMock - - CambriaIdentityManager cambriaMock = Mockito.mock(CambriaIdentityManager.class); - Mockito.when(cambriaMock.createApiKey(Mockito.any(String.class), Mockito.any(String.class))).thenThrow(new CambriaApiException("failure")); - client.cambriaIdentityManager = cambriaMock; - - IDistributionClientResult initResponse = client.init(testConfiguration, new TestNotificationCallback()); - assertEquals(DistributionActionResultEnum.UEB_KEYS_CREATION_FAILED, initResponse.getDistributionActionResult()); - - Mockito.verify(client, Mockito.times(1)).getUEBServerList(); - Mockito.verify(cambriaMock, Mockito.times(1)).createApiKey(Mockito.anyString(), Mockito.anyString()); - Mockito.verify(connector, Mockito.times(0)).registerAsdcTopics(Mockito.any(ApiCredential.class)); - System.out.println(initResponse); - } - - @Test - void initRegistrationFailedTest() throws HttpException, CambriaApiException, IOException { - - // connectorMock - mockArtifactTypeList(); - DistributionClientResultImpl failureResult = new DistributionClientResultImpl(DistributionActionResultEnum.BAD_REQUEST, "Bad Request"); - Either topicsResult = Either.right(failureResult); - Mockito.when(connector.registerAsdcTopics(Mockito.any(ApiCredential.class))).thenReturn(topicsResult); - - reconfigureAsdcConnector(connector, client); - - // cambriaMock - - CambriaIdentityManager cambriaMock = Mockito.mock(CambriaIdentityManager.class); - Mockito.when(cambriaMock.createApiKey(Mockito.any(String.class), Mockito.any(String.class))) - .thenReturn(new ApiCredential("public", "secret")); - client.cambriaIdentityManager = cambriaMock; - - IDistributionClientResult initResponse = client.init(testConfiguration, new TestNotificationCallback()); - assertEquals(DistributionActionResultEnum.BAD_REQUEST, initResponse.getDistributionActionResult()); - Mockito.verify(client, Mockito.times(1)).getUEBServerList(); - Mockito.verify(cambriaMock, Mockito.times(1)).createApiKey(Mockito.anyString(), Mockito.anyString()); - Mockito.verify(connector, Mockito.times(1)).registerAsdcTopics(Mockito.any(ApiCredential.class)); - System.out.println(initResponse); + assertEquals(DistributionActionResultEnum.DISTRIBUTION_CLIENT_ALREADY_INITIALIZED, initResponse.getDistributionActionResult()); } @Test void testStartWithoutInit() { - IDistributionClientResult result = client.start(); - assertEquals(DistributionActionResultEnum.DISTRIBUTION_CLIENT_NOT_INITIALIZED, result.getDistributionActionResult()); - } - - private IArtifactInfo initArtifactInfo() { - ArtifactInfoImpl artifactInfo = new ArtifactInfoImpl(); - artifactInfo.setArtifactURL("/sdc/v1/services/serviceName/0.1/artifacts/aaa.hh"); - artifactInfo.setArtifactChecksum(ArtifactsUtils.getValidChecksum()); - return artifactInfo; - } - - // ########### TESTS TO ADD TO CI START ########### - /*public void createKeysTestCI() throws MalformedURLException, GeneralSecurityException { - validateConfigurationTest(); - CambriaIdentityManager trueCambria = new CambriaClientBuilders.IdentityManagerBuilder().usingHttps().usingHosts(serverList).build(); - client.cambriaIdentityManager = trueCambria; - DistributionClientResultImpl keysResult = client.createUebKeys(); - assertEquals(DistributionActionResultEnum.SUCCESS, keysResult.getDistributionActionResult()); - assertFalse(client.credential.getApiKey().isEmpty()); - assertFalse(client.credential.getApiSecret().isEmpty()); - - System.out.println(keysResult); - System.out.println("keys: public=" + client.credential.getApiKey() + " | secret=" + client.credential.getApiSecret()); - } -*/ - public void initTestCI() { - IDistributionClient distributionClient = DistributionClientFactory.createDistributionClient(); - IDistributionClientResult init = distributionClient.init(testConfiguration, new TestNotificationCallback()); - assertEquals(DistributionActionResultEnum.SUCCESS, init.getDistributionActionResult()); - + IDistributionClientResult result = client.start(); + assertSame(DistributionActionResultEnum.DISTRIBUTION_CLIENT_NOT_INITIALIZED, + result.getDistributionActionResult()); } @Test - void testDecodeVfModuleArtifact() throws IOException { + void testDecodeVfModuleArtifact() { String vfModuleContent = getVFModuleExample(); List decodeVfModuleArtifact = client.decodeVfModuleArtifact(vfModuleContent.getBytes()); assertEquals(1, decodeVfModuleArtifact.size()); @@ -504,23 +334,4 @@ class DistributionClientTest { " }\r\n" + "]"; } - - - public void connectorRegisterCI() { - SdcConnectorClient connector = new SdcConnectorClient(testConfiguration, new HttpAsdcClient(testConfiguration)); - - ApiCredential creds = new ApiCredential("publicKey", "secretKey"); - Either topicsFromAsdc = connector.registerAsdcTopics(creds); - assertTrue(topicsFromAsdc.isLeft()); - - } - - public void downloadArtifactTestCI() { - SdcConnectorClient connector = new SdcConnectorClient(testConfiguration, new HttpAsdcClient(testConfiguration)); - IArtifactInfo artifactInfo = initArtifactInfo(); - connector.downloadArtifact(artifactInfo); - - } - // ########### TESTS TO ADD TO CI END ########### - } diff --git a/sdc-distribution-client/src/test/java/org/onap/sdc/impl/NotificationConsumerTest.java b/sdc-distribution-client/src/test/java/org/onap/sdc/impl/NotificationConsumerTest.java index 1e3db81..a70a537 100644 --- a/sdc-distribution-client/src/test/java/org/onap/sdc/impl/NotificationConsumerTest.java +++ b/sdc-distribution-client/src/test/java/org/onap/sdc/impl/NotificationConsumerTest.java @@ -21,30 +21,20 @@ package org.onap.sdc.impl; -import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; -import com.att.nsa.cambria.client.CambriaConsumer; import com.google.gson.Gson; import com.google.gson.GsonBuilder; -import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Queue; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import org.awaitility.Durations; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -56,359 +46,297 @@ import org.onap.sdc.utils.ArtifactTypeEnum; import org.onap.sdc.utils.DistributionActionResultEnum; import org.onap.sdc.utils.DistributionClientConstants; import org.onap.sdc.utils.TestConfiguration; +import org.onap.sdc.utils.kafka.SdcKafkaConsumer; class NotificationConsumerTest { - - final static IDistributionClientResult DISTRIBUTION_SUCCESS_RESULT = buildSuccessResult(); - private final CambriaConsumer cambriaConsumer = mock(CambriaConsumer.class); - private final INotificationCallback clientCallback = spy(INotificationCallback.class); - private final Queue> notificationsQueue = new LinkedList<>(); - private final DistributionClientImpl distributionClient = Mockito.spy(DistributionClientImpl.class); - private List artifactsTypes = List.of(ArtifactTypeEnum.HEAT.name()); - private final List notificationStatusResults = new ArrayList<>(); - - private static IDistributionClientResult buildSuccessResult() { - return new IDistributionClientResult() { - - @Override - public String getDistributionMessageResult() { - return ""; - } - - @Override - public DistributionActionResultEnum getDistributionActionResult() { - return DistributionActionResultEnum.SUCCESS; - } - }; - } - - private NotificationConsumer createNotificationConsumer() { - return new NotificationConsumer(cambriaConsumer, clientCallback, artifactsTypes, distributionClient); - } - - @BeforeEach - public void beforeTest() throws IOException { - Mockito.reset(clientCallback, distributionClient); - when(cambriaConsumer.fetch()).then((Answer>) invocation -> { - if (!notificationsQueue.isEmpty()) { - return notificationsQueue.remove(); - } else { - return new ArrayList<>(); - } - }); - when(distributionClient.sendNotificationStatus(anyLong(), anyString(), any(ArtifactInfoImpl.class), anyBoolean())) - .then((Answer) invocation -> { - boolean isNotified = (boolean) invocation.getArguments()[3]; - notificationStatusResults.add(isNotified); - return DISTRIBUTION_SUCCESS_RESULT; - }); - - } - - @Test - void testNoNotificationsSent() { - ScheduledExecutorService executorPool = Executors.newScheduledThreadPool(DistributionClientConstants.POOL_SIZE); - ScheduledFuture scheduledFuture = executorPool.scheduleAtFixedRate(createNotificationConsumer(), 0, 100, TimeUnit.MILLISECONDS); - await().atMost(Durations.ONE_SECOND).until(() -> !scheduledFuture.isDone()); - executorPool.shutdown(); - - Mockito.verify(clientCallback, Mockito.times(0)).activateCallback(any(INotificationData.class)); - } - - @Test - void testNonRelevantNotificationSent() throws InterruptedException { - - simulateNotificationFromUEB(getAsdcServiceNotificationWithoutHeatArtifact()); - Mockito.verify(clientCallback, Mockito.times(0)).activateCallback(any(INotificationData.class)); - } - - @Test - void testRelevantNotificationSent() throws InterruptedException { - simulateNotificationFromUEB(getAsdcServiceNotificationWithHeatArtifact()); - Mockito.verify(clientCallback, Mockito.times(1)).activateCallback(any(INotificationData.class)); - } - - @Test - void testNonExistingArtifactsNotificationSent() throws InterruptedException { - simulateNotificationFromUEB(getAsdcNotificationWithNonExistentArtifact()); - Mockito.verify(clientCallback, Mockito.times(1)).activateCallback(any(INotificationData.class)); - } - - @Test - void testNotificationStatusSent() throws InterruptedException { - simulateNotificationFromUEB(getAsdcServiceNotificationWithHeatArtifact()); - - Mockito.verify(distributionClient, Mockito.times(3)) - .sendNotificationStatus(anyLong(), anyString(), any(ArtifactInfoImpl.class), anyBoolean()); - assertEquals(1, countInstances(notificationStatusResults, Boolean.TRUE)); - assertEquals(2, countInstances(notificationStatusResults, Boolean.FALSE)); - } - - @Test - void testNotificationRelatedArtifacts() throws InterruptedException { - List artifactTypesTmp = new ArrayList<>(); - for (ArtifactTypeEnum artifactTypeEnum : ArtifactTypeEnum.values()) { - artifactTypesTmp.add(artifactTypeEnum.name()); - } - artifactsTypes = artifactTypesTmp; - simulateNotificationFromUEB(getAsdcServiceNotificationWithRelatedArtifacts()); - - Mockito.verify(distributionClient, Mockito.times(3)) - .sendNotificationStatus(anyLong(), anyString(), any(ArtifactInfoImpl.class), anyBoolean()); - assertEquals(3, countInstances(notificationStatusResults, Boolean.TRUE)); - assertEquals(0, countInstances(notificationStatusResults, Boolean.FALSE)); - } - - @Test - void testNotificationStatusWithServiceArtifatcs() throws InterruptedException { - simulateNotificationFromUEB(getNotificationWithServiceArtifatcs()); - Mockito.verify(distributionClient, Mockito.times(6)) - .sendNotificationStatus(anyLong(), anyString(), any(ArtifactInfoImpl.class), anyBoolean()); - assertEquals(2, countInstances(notificationStatusResults, Boolean.TRUE)); - assertEquals(4, countInstances(notificationStatusResults, Boolean.FALSE)); - - } - - @Test - final void testBuildCallbackNotificationLogicFlagIsFalse() { - NotificationConsumer consumer = createNotificationConsumer(); - Gson gson = new GsonBuilder().setPrettyPrinting().create(); - TestConfiguration testConfiguration = new TestConfiguration(); - testConfiguration.setFilterInEmptyResources(false); - when(distributionClient.getConfiguration()).thenReturn(testConfiguration); - NotificationDataImpl notification = gson.fromJson(getNotificationWithMultipleResources(), NotificationDataImpl.class); - NotificationDataImpl notificationBuiltInClient = consumer.buildCallbackNotificationLogic(0, notification); - assertEquals(1, notificationBuiltInClient.getResources().size()); - } - - @Test - final void testBuildCallbackNotificationLogicFlagIsTrue() { - NotificationConsumer consumer = createNotificationConsumer(); - Gson gson = new GsonBuilder().setPrettyPrinting().create(); - TestConfiguration testConfiguration = new TestConfiguration(); - testConfiguration.setFilterInEmptyResources(true); - when(distributionClient.getConfiguration()).thenReturn(testConfiguration); - NotificationDataImpl notification = gson.fromJson(getNotificationWithMultipleResources(), NotificationDataImpl.class); - NotificationDataImpl notificationBuiltInClient = consumer.buildCallbackNotificationLogic(0, notification); - assertEquals(2, notificationBuiltInClient.getResources().size()); - } - - private void simulateNotificationFromUEB(final String notificationFromUEB) throws InterruptedException { - ScheduledExecutorService executorPool = Executors.newScheduledThreadPool(DistributionClientConstants.POOL_SIZE); - ScheduledFuture scheduledFuture = executorPool.scheduleAtFixedRate(createNotificationConsumer(), 0, 100, TimeUnit.MILLISECONDS); - - await().atMost(Durations.TWO_HUNDRED_MILLISECONDS).until(() -> !scheduledFuture.isDone()); - - List nonHeatNotification = Collections.singletonList(notificationFromUEB); - notificationsQueue.add(nonHeatNotification); - Thread.sleep(800); - executorPool.shutdown(); - } - - private String getAsdcServiceNotificationWithHeatArtifact() { - return "{\"distributionID\" : \"bcc7a72e-90b1-4c5f-9a37-28dc3cd86416\",\r\n" + " \"serviceName\" : \"Testnotificationser1\",\r\n" - + " \"serviceVersion\" : \"1.0\",\r\n" - + " \"serviceUUID\" : \"7f7f94f4-373a-4b71-a0e3-80ae2ba4eb5d\",\r\n" + " \"serviceDescription\" : \"TestNotificationVF1\",\r\n" - + " \"resources\" : [{\r\n" + " \"resourceInstanceName\" : \"testnotificationvf11\",\r\n" - + " \"resourceName\" : \"TestNotificationVF1\",\r\n" + " \"resourceVersion\" : \"1.0\",\r\n" - + " \"resoucreType\" : \"VF\",\r\n" + " \"resourceUUID\" : \"907e1746-9f69-40f5-9f2a-313654092a2d\",\r\n" - + " \"artifacts\" : [{\r\n" + " \"artifactName\" : \"sample-xml-alldata-1-1.xml\",\r\n" - + " \"artifactType\" : \"YANG_XML\",\r\n" - + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/sample-xml-alldata-1-1.xml\",\r\n" - + " \"artifactChecksum\" : \"MTUxODFkMmRlOTNhNjYxMGYyYTI1ZjA5Y2QyNWQyYTk\\u003d\",\r\n" - + " \"artifactDescription\" : \"MyYang\",\r\n" + " \"artifactTimeout\" : 0,\r\n" - + " \"artifactUUID\" : \"0005bc4a-2c19-452e-be6d-d574a56be4d0\",\r\n" + " \"artifactVersion\" : \"1\"\r\n" - + " }, {\r\n" + " \"artifactName\" : \"heat.yaml\",\r\n" - + " \"artifactType\" : \"HEAT\",\r\n" - + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.yaml\",\r\n" - + " \"artifactChecksum\" : \"ODEyNjE4YTMzYzRmMTk2ODVhNTU2NTg3YWEyNmIxMTM\\u003d\",\r\n" - + " \"artifactDescription\" : \"heat\",\r\n" + " \"artifactTimeout\" : 60,\r\n" - + " \"artifactUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\",\r\n" + " \"artifactVersion\" : \"1\"\r\n" - + " }, {\r\n" + " \"artifactName\" : \"heat.env\",\r\n" - + " \"artifactType\" : \"HEAT_ENV\",\r\n" - + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.env\",\r\n" - + " \"artifactChecksum\" : \"NGIzMjExZTM1NDc2NjBjOTQyMGJmMWNiMmU0NTE5NzM\\u003d\",\r\n" - + " \"artifactDescription\" : \"Auto-generated HEAT Environment deployment artifact\",\r\n" - + " \"artifactTimeout\" : 0,\r\n" + " \"artifactUUID\" : \"ce65d31c-35c0-43a9-90c7-596fc51d0c86\",\r\n" - + " \"artifactVersion\" : \"1\",\r\n" - + " \"generatedFromUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\"\r\n" + " }\r\n" + " ]\r\n" - + " }\r\n" + " ]}"; - } - - private String getNotificationWithMultipleResources() { - return "{\"distributionID\" : \"bcc7a72e-90b1-4c5f-9a37-28dc3cd86416\",\r\n" + - " \"serviceName\" : \"Testnotificationser1\",\r\n" + - " \"serviceVersion\" : \"1.0\",\r\n" + - " \"serviceUUID\" : \"7f7f94f4-373a-4b71-a0e3-80ae2ba4eb5d\",\r\n" + - " \"serviceDescription\" : \"TestNotificationVF1\",\r\n" + - " \"resources\" : [{\r\n" + - " \"resourceInstanceName\" : \"testnotificationvf11\",\r\n" + - " \"resourceName\" : \"TestNotificationVF1\",\r\n" + - " \"resourceVersion\" : \"1.0\",\r\n" + - " \"resoucreType\" : \"VF\",\r\n" + - " \"resourceUUID\" : \"907e1746-9f69-40f5-9f2a-313654092a2d\",\r\n" + - " \"artifacts\" : [{\r\n" + - " \"artifactName\" : \"sample-xml-alldata-1-1.xml\",\r\n" + - " \"artifactType\" : \"YANG_XML\",\r\n" + - " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/sample-xml-alldata-1-1.xml\",\r\n" - + - " \"artifactChecksum\" : \"MTUxODFkMmRlOTNhNjYxMGYyYTI1ZjA5Y2QyNWQyYTk\\u003d\",\r\n" + - " \"artifactDescription\" : \"MyYang\",\r\n" + - " \"artifactTimeout\" : 0,\r\n" + - " \"artifactUUID\" : \"0005bc4a-2c19-452e-be6d-d574a56be4d0\",\r\n" + - " \"artifactVersion\" : \"1\"\r\n" + - " }" + - " ]\r\n" + - " },\r\n" + - " {\r\n" + - " \"resourceInstanceName\" : \"testnotificationvf12\",\r\n" + - " \"resourceName\" : \"TestNotificationVF1\",\r\n" + - " \"resourceVersion\" : \"1.0\",\r\n" + - " \"resoucreType\" : \"VF\",\r\n" + - " \"resourceUUID\" : \"907e1746-9f69-40f5-9f2a-313654092a2e\",\r\n" + - " \"artifacts\" : [{\r\n" + - " \"artifactName\" : \"heat.yaml\",\r\n" + - " \"artifactType\" : \"HEAT\",\r\n" + - " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.yaml\",\r\n" - + - " \"artifactChecksum\" : \"ODEyNjE4YTMzYzRmMTk2ODVhNTU2NTg3YWEyNmIxMTM\\u003d\",\r\n" + - " \"artifactDescription\" : \"heat\",\r\n" + - " \"artifactTimeout\" : 60,\r\n" + - " \"artifactUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\",\r\n" + - " \"artifactVersion\" : \"1\"\r\n" + - " }" + - " ]\r\n" + - " }\r\n" + - " ]}"; - } - - - private String getAsdcNotificationWithNonExistentArtifact() { - return "{\"distributionID\" : \"bcc7a72e-90b1-4c5f-9a37-28dc3cd86416\",\r\n" + " \"serviceName\" : \"Testnotificationser1\",\r\n" - + " \"serviceVersion\" : \"1.0\",\r\n" - + " \"serviceUUID\" : \"7f7f94f4-373a-4b71-a0e3-80ae2ba4eb5d\",\r\n" + " \"serviceDescription\" : \"TestNotificationVF1\",\r\n" - + " \"bugabuga\" : \"xyz\",\r\n" + " \"resources\" : [{\r\n" - + " \"resourceInstanceName\" : \"testnotificationvf11\",\r\n" + " \"resourceName\" : \"TestNotificationVF1\",\r\n" - + " \"resourceVersion\" : \"1.0\",\r\n" + " \"resoucreType\" : \"VF\",\r\n" - + " \"resourceUUID\" : \"907e1746-9f69-40f5-9f2a-313654092a2d\",\r\n" + " \"artifacts\" : [{\r\n" - + " \"artifactName\" : \"heat.yaml\",\r\n" + " \"artifactType\" : \"HEAT\",\r\n" - + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.yaml\",\r\n" - + " \"artifactChecksum\" : \"ODEyNjE4YTMzYzRmMTk2ODVhNTU2NTg3YWEyNmIxMTM\\u003d\",\r\n" - + " \"artifactDescription\" : \"heat\",\r\n" + " \"artifactTimeout\" : 60,\r\n" - + " \"artifactUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\",\r\n" - + " \"artifactBuga\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\",\r\n" + " \"artifactVersion\" : \"1\"\r\n" - + " }, {\r\n" + " \"artifactName\" : \"buga.bug\",\r\n" + " \"artifactType\" : \"BUGA_BUGA\",\r\n" - + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.env\",\r\n" - + " \"artifactChecksum\" : \"NGIzMjExZTM1NDc2NjBjOTQyMGJmMWNiMmU0NTE5NzM\\u003d\",\r\n" - + " \"artifactDescription\" : \"Auto-generated HEAT Environment deployment artifact\",\r\n" - + " \"artifactTimeout\" : 0,\r\n" + " \"artifactUUID\" : \"ce65d31c-35c0-43a9-90c7-596fc51d0c86\",\r\n" - + " \"artifactVersion\" : \"1\",\r\n" - + " \"generatedFromUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\"\r\n" + " }\r\n" + " ]\r\n" - + " }\r\n" + " ]}"; - } - - private String getAsdcServiceNotificationWithRelatedArtifacts() { - return "{\"distributionID\" : \"bcc7a72e-90b1-4c5f-9a37-28dc3cd86416\",\r\n" + " \"serviceName\" : \"Testnotificationser1\",\r\n" - + " \"serviceVersion\" : \"1.0\",\r\n" - + " \"serviceUUID\" : \"7f7f94f4-373a-4b71-a0e3-80ae2ba4eb5d\",\r\n" + " \"serviceDescription\" : \"TestNotificationVF1\",\r\n" - + " \"resources\" : [{\r\n" + " \"resourceInstanceName\" : \"testnotificationvf11\",\r\n" - + " \"resourceName\" : \"TestNotificationVF1\",\r\n" + " \"resourceVersion\" : \"1.0\",\r\n" - + " \"resoucreType\" : \"VF\",\r\n" + " \"resourceUUID\" : \"907e1746-9f69-40f5-9f2a-313654092a2d\",\r\n" - + " \"artifacts\" : [{\r\n" + " \"artifactName\" : \"sample-xml-alldata-1-1.xml\",\r\n" - + " \"artifactType\" : \"YANG_XML\",\r\n" - + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/sample-xml-alldata-1-1.xml\",\r\n" - + " \"artifactChecksum\" : \"MTUxODFkMmRlOTNhNjYxMGYyYTI1ZjA5Y2QyNWQyYTk\\u003d\",\r\n" - + " \"artifactDescription\" : \"MyYang\",\r\n" + " \"artifactTimeout\" : 0,\r\n" - + " \"artifactUUID\" : \"0005bc4a-2c19-452e-be6d-d574a56be4d0\",\r\n" + " \"artifactVersion\" : \"1\",\r\n" - + " \"relatedArtifacts\" : [\r\n" - + " \"ce65d31c-35c0-43a9-90c7-596fc51d0c86\"\r\n" + " ]" + " }, {\r\n" - + " \"artifactName\" : \"heat.yaml\",\r\n" - + " \"artifactType\" : \"HEAT\",\r\n" - + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.yaml\",\r\n" - + " \"artifactChecksum\" : \"ODEyNjE4YTMzYzRmMTk2ODVhNTU2NTg3YWEyNmIxMTM\\u003d\",\r\n" - + " \"artifactDescription\" : \"heat\",\r\n" + " \"artifactTimeout\" : 60,\r\n" - + " \"artifactUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\",\r\n" + " \"artifactVersion\" : \"1\", \r\n" - + " \"relatedArtifacts\" : [\r\n" - + " \"0005bc4a-2c19-452e-be6d-d574a56be4d0\", \r\n" + " \"ce65d31c-35c0-43a9-90c7-596fc51d0c86\"\r\n" - + " ]" + " }, {\r\n" - + " \"artifactName\" : \"heat.env\",\r\n" + " \"artifactType\" : \"HEAT_ENV\",\r\n" - + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.env\",\r\n" - + " \"artifactChecksum\" : \"NGIzMjExZTM1NDc2NjBjOTQyMGJmMWNiMmU0NTE5NzM\\u003d\",\r\n" - + " \"artifactDescription\" : \"Auto-generated HEAT Environment deployment artifact\",\r\n" - + " \"artifactTimeout\" : 0,\r\n" + " \"artifactUUID\" : \"ce65d31c-35c0-43a9-90c7-596fc51d0c86\",\r\n" - + " \"artifactVersion\" : \"1\",\r\n" - + " \"generatedFromUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\"\r\n" + " }\r\n" + " ]\r\n" - + " }\r\n" + " ]}"; - } - - private String getAsdcServiceNotificationWithoutHeatArtifact() { - return "{" + " \"distributionID\" : \"5v1234d8-5b6d-42c4-7t54-47v95n58qb7\"," + " \"serviceName\" : \"srv1\"," - + " \"serviceVersion\": \"2.0\"," + " \"serviceUUID\" : \"4e0697d8-5b6d-42c4-8c74-46c33d46624c\"," - + " \"serviceArtifacts\":[" + " {" + " \"artifactName\" : \"ddd.yml\"," - + " \"artifactType\" : \"DG_XML\"," + " \"artifactTimeout\" : \"65\"," - + " \"artifactDescription\" : \"description\"," + " \"artifactURL\" :" - + " \"/sdc/v1/catalog/services/srv1/2.0/resources/ddd/3.0/artifacts/ddd.xml\" ," - + " \"resourceUUID\" : \"4e5874d8-5b6d-42c4-8c74-46c33d90drw\" ," - + " \"checksum\" : \"15e389rnrp58hsw==\"" + " }" + " ]" + "}"; - } - - private String getNotificationWithServiceArtifatcs() { - return "{\r\n" + " \"distributionID\" : \"bcc7a72e-90b1-4c5f-9a37-28dc3cd86416\",\r\n" + " \"serviceName\" : \"Testnotificationser1\",\r\n" - + " \"serviceVersion\" : \"1.0\",\r\n" - + " \"serviceUUID\" : \"7f7f94f4-373a-4b71-a0e3-80ae2ba4eb5d\",\r\n" + " \"serviceDescription\" : \"TestNotificationVF1\",\r\n" - + " \"serviceArtifacts\" : [{\r\n" + " \"artifactName\" : \"sample-xml-alldata-1-1.xml\",\r\n" - + " \"artifactType\" : \"YANG_XML\",\r\n" - + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/sample-xml-alldata-1-1.xml\",\r\n" - + " \"artifactChecksum\" : \"MTUxODFkMmRlOTNhNjYxMGYyYTI1ZjA5Y2QyNWQyYTk\\u003d\",\r\n" - + " \"artifactDescription\" : \"MyYang\",\r\n" + " \"artifactTimeout\" : 0,\r\n" - + " \"artifactUUID\" : \"0005bc4a-2c19-452e-be6d-d574a56be4d0\",\r\n" + " \"artifactVersion\" : \"1\"\r\n" - + " }, {\r\n" + " \"artifactName\" : \"heat.yaml\",\r\n" - + " \"artifactType\" : \"HEAT\",\r\n" - + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.yaml\",\r\n" - + " \"artifactChecksum\" : \"ODEyNjE4YTMzYzRmMTk2ODVhNTU2NTg3YWEyNmIxMTM\\u003d\",\r\n" - + " \"artifactDescription\" : \"heat\",\r\n" + " \"artifactTimeout\" : 60,\r\n" - + " \"artifactUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\",\r\n" + " \"artifactVersion\" : \"1\"\r\n" - + " }, {\r\n" + " \"artifactName\" : \"heat.env\",\r\n" - + " \"artifactType\" : \"HEAT_ENV\",\r\n" - + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.env\",\r\n" - + " \"artifactChecksum\" : \"NGIzMjExZTM1NDc2NjBjOTQyMGJmMWNiMmU0NTE5NzM\\u003d\",\r\n" - + " \"artifactDescription\" : \"Auto-generated HEAT Environment deployment artifact\",\r\n" - + " \"artifactTimeout\" : 0,\r\n" + " \"artifactUUID\" : \"ce65d31c-35c0-43a9-90c7-596fc51d0c86\",\r\n" - + " \"artifactVersion\" : \"1\",\r\n" - + " \"generatedFromUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\"\r\n" + " }\r\n" + " ],\r\n" - + " \"resources\" : [{\r\n" + " \"resourceInstanceName\" : \"testnotificationvf11\",\r\n" - + " \"resourceName\" : \"TestNotificationVF1\",\r\n" + " \"resourceVersion\" : \"1.0\",\r\n" - + " \"resoucreType\" : \"VF\",\r\n" + " \"resourceUUID\" : \"907e1746-9f69-40f5-9f2a-313654092a2d\",\r\n" - + " \"artifacts\" : [{\r\n" + " \"artifactName\" : \"sample-xml-alldata-1-1.xml\",\r\n" - + " \"artifactType\" : \"YANG_XML\",\r\n" - + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/sample-xml-alldata-1-1.xml\",\r\n" - + " \"artifactChecksum\" : \"MTUxODFkMmRlOTNhNjYxMGYyYTI1ZjA5Y2QyNWQyYTk\\u003d\",\r\n" - + " \"artifactDescription\" : \"MyYang\",\r\n" + " \"artifactTimeout\" : 0,\r\n" - + " \"artifactUUID\" : \"0005bc4a-2c19-452e-be6d-d574a56be4d0\",\r\n" + " \"artifactVersion\" : \"1\"\r\n" - + " }, {\r\n" + " \"artifactName\" : \"heat.yaml\",\r\n" - + " \"artifactType\" : \"HEAT\",\r\n" - + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.yaml\",\r\n" - + " \"artifactChecksum\" : \"ODEyNjE4YTMzYzRmMTk2ODVhNTU2NTg3YWEyNmIxMTM\\u003d\",\r\n" - + " \"artifactDescription\" : \"heat\",\r\n" + " \"artifactTimeout\" : 60,\r\n" - + " \"artifactUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\",\r\n" + " \"artifactVersion\" : \"1\"\r\n" - + " }, {\r\n" + " \"artifactName\" : \"heat.env\",\r\n" - + " \"artifactType\" : \"HEAT_ENV\",\r\n" - + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.env\",\r\n" - + " \"artifactChecksum\" : \"NGIzMjExZTM1NDc2NjBjOTQyMGJmMWNiMmU0NTE5NzM\\u003d\",\r\n" - + " \"artifactDescription\" : \"Auto-generated HEAT Environment deployment artifact\",\r\n" - + " \"artifactTimeout\" : 0,\r\n" + " \"artifactUUID\" : \"ce65d31c-35c0-43a9-90c7-596fc51d0c86\",\r\n" - + " \"artifactVersion\" : \"1\",\r\n" - + " \"generatedFromUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\"\r\n" + " }\r\n" + " ]\r\n" + " }\r\n" - + " ]\r\n" + "}"; - } - - private int countInstances(List list, T element) { - int count = 0; - for (T curr : list) { - if (curr.equals(element)) { - count++; - } - } - return count; - } -} + private final SdcKafkaConsumer consumer = mock(SdcKafkaConsumer.class); + private final INotificationCallback clientCallback = spy(INotificationCallback.class); + private final Queue> notificationsQueue = new LinkedList<>(); + private final DistributionClientImpl distributionClient = Mockito.spy(DistributionClientImpl.class); + private List artifactsTypes = List.of(ArtifactTypeEnum.HEAT.name()); + private final List notificationStatusResults = new ArrayList<>(); + final static IDistributionClientResult DISTRIBUTION_SUCCESS_RESULT = buildSuccessResult(); + + private NotificationConsumer createNotificationConsumer() { + return new NotificationConsumer(consumer, clientCallback, artifactsTypes, distributionClient); + } + + @BeforeEach + public void beforeTest() { + Mockito.reset(clientCallback, distributionClient); + when(consumer.poll()).then((Answer>) invocation -> { + if (!notificationsQueue.isEmpty()) { + return notificationsQueue.remove(); + } else { + return new ArrayList<>(); + } + }); + when(distributionClient.sendNotificationStatus(Mockito.anyLong(), Mockito.anyString(), Mockito.any(ArtifactInfoImpl.class), Mockito.anyBoolean())).then( + (Answer) invocation -> { + boolean isNotified = (boolean) invocation.getArguments()[3]; + notificationStatusResults.add(isNotified); + return DISTRIBUTION_SUCCESS_RESULT; + }); + + } + + private static IDistributionClientResult buildSuccessResult() { + return new IDistributionClientResult() { + + @Override + public String getDistributionMessageResult() { + return ""; + } + + @Override + public DistributionActionResultEnum getDistributionActionResult() { + return DistributionActionResultEnum.SUCCESS; + } + }; + } + + @Test + void testNoNotifiactionsSent() throws InterruptedException { + + ScheduledExecutorService executorPool = Executors.newScheduledThreadPool(DistributionClientConstants.POOL_SIZE); + executorPool.scheduleAtFixedRate(createNotificationConsumer(), 0, 100, TimeUnit.MILLISECONDS); + + Thread.sleep(1000); + executorPool.shutdown(); + + Mockito.verify(clientCallback, Mockito.times(0)).activateCallback(Mockito.any(INotificationData.class)); + + } + + @Test + void testNonRelevantNotificationSent() throws InterruptedException { + + simulateNotificationFromMessageBus(getSdcServiceNotificationWithoutHeatArtifact()); + Mockito.verify(clientCallback, Mockito.times(0)).activateCallback(Mockito.any(INotificationData.class)); + + } + + @Test + void testRelevantNotificationSent() throws InterruptedException { + simulateNotificationFromMessageBus(getSdcServiceNotificationWithHeatArtifact()); + Mockito.verify(clientCallback, Mockito.times(1)).activateCallback(Mockito.any(INotificationData.class)); + + } + + @Test + void testNonExistingArtifactsNotificationSent() throws InterruptedException { + simulateNotificationFromMessageBus(getSdcNotificationWithNonExistentArtifact()); + Mockito.verify(clientCallback, Mockito.times(1)).activateCallback(Mockito.any(INotificationData.class)); + + } + + @Test + void testNotificationStatusSent() throws InterruptedException { + simulateNotificationFromMessageBus(getSdcServiceNotificationWithHeatArtifact()); + + Mockito.verify(distributionClient, Mockito.times(3)).sendNotificationStatus(Mockito.anyLong(), Mockito.anyString(), Mockito.any(ArtifactInfoImpl.class), Mockito.anyBoolean()); + assertEquals(1, countInstances(notificationStatusResults, Boolean.TRUE)); + assertEquals(2, countInstances(notificationStatusResults, Boolean.FALSE)); + } + + @Test + void testNotificationRelatedArtifacts() throws InterruptedException { + List artifactTypesTmp = new ArrayList<>(); + for (ArtifactTypeEnum artifactTypeEnum : ArtifactTypeEnum.values()) { + artifactTypesTmp.add(artifactTypeEnum.name()); + } + artifactsTypes = artifactTypesTmp; + simulateNotificationFromMessageBus(getSdcServiceNotificationWithRelatedArtifacts()); + + Mockito.verify(distributionClient, Mockito.times(3)).sendNotificationStatus(Mockito.anyLong(), Mockito.anyString(), Mockito.any(ArtifactInfoImpl.class), Mockito.anyBoolean()); + assertEquals(3, countInstances(notificationStatusResults, Boolean.TRUE)); + assertEquals(0, countInstances(notificationStatusResults, Boolean.FALSE)); + } + + @Test + void testNotificationStatusWithServiceArtifatcs() throws InterruptedException { + simulateNotificationFromMessageBus(getNotificationWithServiceArtifatcs()); + Mockito.verify(distributionClient, Mockito.times(6)).sendNotificationStatus(Mockito.anyLong(), Mockito.anyString(), Mockito.any(ArtifactInfoImpl.class), Mockito.anyBoolean()); + assertEquals(2, countInstances(notificationStatusResults, Boolean.TRUE)); + assertEquals(4, countInstances(notificationStatusResults, Boolean.FALSE)); + + } + + @Test + final void testBuildCallbackNotificationLogicFlagIsFalse() { + NotificationConsumer consumer = createNotificationConsumer(); + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + TestConfiguration testConfiguration = new TestConfiguration(); + testConfiguration.setFilterInEmptyResources(false); + when(distributionClient.getConfiguration()).thenReturn(testConfiguration); + NotificationDataImpl notification = gson.fromJson(getNotificationWithMultipleResources(), NotificationDataImpl.class); + NotificationDataImpl notificationBuiltInClient = consumer.buildCallbackNotificationLogic(0, notification); + assertEquals(1, notificationBuiltInClient.getResources().size()); + } + + @Test + final void testBuildCallbackNotificationLogicFlagIsTrue() { + NotificationConsumer consumer = createNotificationConsumer(); + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + TestConfiguration testConfiguration = new TestConfiguration(); + testConfiguration.setFilterInEmptyResources(true); + when(distributionClient.getConfiguration()).thenReturn(testConfiguration); + NotificationDataImpl notification = gson.fromJson(getNotificationWithMultipleResources(), NotificationDataImpl.class); + NotificationDataImpl notificationBuiltInClient = consumer.buildCallbackNotificationLogic(0, notification); + assertEquals(2, notificationBuiltInClient.getResources().size()); + } + + private void simulateNotificationFromMessageBus(final String notificationFromMessageBus) throws InterruptedException { + ScheduledExecutorService executorPool = Executors.newScheduledThreadPool(DistributionClientConstants.POOL_SIZE); + executorPool.scheduleAtFixedRate(createNotificationConsumer(), 0, 100, TimeUnit.MILLISECONDS); + + Thread.sleep(200); + + List nonHeatNotification = List.of(notificationFromMessageBus); + notificationsQueue.add(nonHeatNotification); + Thread.sleep(800); + executorPool.shutdown(); + } + + private String getSdcServiceNotificationWithHeatArtifact() { + return "{\"distributionID\" : \"bcc7a72e-90b1-4c5f-9a37-28dc3cd86416\",\r\n" + " \"serviceName\" : \"Testnotificationser1\",\r\n" + " \"serviceVersion\" : \"1.0\",\r\n" + + " \"serviceUUID\" : \"7f7f94f4-373a-4b71-a0e3-80ae2ba4eb5d\",\r\n" + " \"serviceDescription\" : \"TestNotificationVF1\",\r\n" + " \"resources\" : [{\r\n" + " \"resourceInstanceName\" : \"testnotificationvf11\",\r\n" + + " \"resourceName\" : \"TestNotificationVF1\",\r\n" + " \"resourceVersion\" : \"1.0\",\r\n" + " \"resoucreType\" : \"VF\",\r\n" + " \"resourceUUID\" : \"907e1746-9f69-40f5-9f2a-313654092a2d\",\r\n" + + " \"artifacts\" : [{\r\n" + " \"artifactName\" : \"sample-xml-alldata-1-1.xml\",\r\n" + " \"artifactType\" : \"YANG_XML\",\r\n" + + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/sample-xml-alldata-1-1.xml\",\r\n" + + " \"artifactChecksum\" : \"MTUxODFkMmRlOTNhNjYxMGYyYTI1ZjA5Y2QyNWQyYTk\\u003d\",\r\n" + " \"artifactDescription\" : \"MyYang\",\r\n" + " \"artifactTimeout\" : 0,\r\n" + + " \"artifactUUID\" : \"0005bc4a-2c19-452e-be6d-d574a56be4d0\",\r\n" + " \"artifactVersion\" : \"1\"\r\n" + " }, {\r\n" + " \"artifactName\" : \"heat.yaml\",\r\n" + + " \"artifactType\" : \"HEAT\",\r\n" + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.yaml\",\r\n" + + " \"artifactChecksum\" : \"ODEyNjE4YTMzYzRmMTk2ODVhNTU2NTg3YWEyNmIxMTM\\u003d\",\r\n" + " \"artifactDescription\" : \"heat\",\r\n" + " \"artifactTimeout\" : 60,\r\n" + + " \"artifactUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\",\r\n" + " \"artifactVersion\" : \"1\"\r\n" + " }, {\r\n" + " \"artifactName\" : \"heat.env\",\r\n" + + " \"artifactType\" : \"HEAT_ENV\",\r\n" + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.env\",\r\n" + + " \"artifactChecksum\" : \"NGIzMjExZTM1NDc2NjBjOTQyMGJmMWNiMmU0NTE5NzM\\u003d\",\r\n" + " \"artifactDescription\" : \"Auto-generated HEAT Environment deployment artifact\",\r\n" + + " \"artifactTimeout\" : 0,\r\n" + " \"artifactUUID\" : \"ce65d31c-35c0-43a9-90c7-596fc51d0c86\",\r\n" + " \"artifactVersion\" : \"1\",\r\n" + + " \"generatedFromUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\"\r\n" + " }\r\n" + " ]\r\n" + " }\r\n" + " ]}"; + } + + private String getNotificationWithMultipleResources(){ + return "{\"distributionID\" : \"bcc7a72e-90b1-4c5f-9a37-28dc3cd86416\",\r\n" + + " \"serviceName\" : \"Testnotificationser1\",\r\n" + + " \"serviceVersion\" : \"1.0\",\r\n" + + " \"serviceUUID\" : \"7f7f94f4-373a-4b71-a0e3-80ae2ba4eb5d\",\r\n" + + " \"serviceDescription\" : \"TestNotificationVF1\",\r\n" + + " \"resources\" : [{\r\n" + + " \"resourceInstanceName\" : \"testnotificationvf11\",\r\n" + + " \"resourceName\" : \"TestNotificationVF1\",\r\n" + + " \"resourceVersion\" : \"1.0\",\r\n" + + " \"resoucreType\" : \"VF\",\r\n" + + " \"resourceUUID\" : \"907e1746-9f69-40f5-9f2a-313654092a2d\",\r\n" + + " \"artifacts\" : [{\r\n" + + " \"artifactName\" : \"sample-xml-alldata-1-1.xml\",\r\n" + + " \"artifactType\" : \"YANG_XML\",\r\n" + + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/sample-xml-alldata-1-1.xml\",\r\n" + + " \"artifactChecksum\" : \"MTUxODFkMmRlOTNhNjYxMGYyYTI1ZjA5Y2QyNWQyYTk\\u003d\",\r\n" + + " \"artifactDescription\" : \"MyYang\",\r\n" + + " \"artifactTimeout\" : 0,\r\n" + + " \"artifactUUID\" : \"0005bc4a-2c19-452e-be6d-d574a56be4d0\",\r\n" + + " \"artifactVersion\" : \"1\"\r\n" + + " }" + + " ]\r\n" + + " },\r\n" + + " {\r\n" + + " \"resourceInstanceName\" : \"testnotificationvf12\",\r\n" + + " \"resourceName\" : \"TestNotificationVF1\",\r\n" + + " \"resourceVersion\" : \"1.0\",\r\n" + + " \"resoucreType\" : \"VF\",\r\n" + + " \"resourceUUID\" : \"907e1746-9f69-40f5-9f2a-313654092a2e\",\r\n" + + " \"artifacts\" : [{\r\n" + + " \"artifactName\" : \"heat.yaml\",\r\n" + + " \"artifactType\" : \"HEAT\",\r\n" + + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.yaml\",\r\n" + + " \"artifactChecksum\" : \"ODEyNjE4YTMzYzRmMTk2ODVhNTU2NTg3YWEyNmIxMTM\\u003d\",\r\n" + + " \"artifactDescription\" : \"heat\",\r\n" + + " \"artifactTimeout\" : 60,\r\n" + + " \"artifactUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\",\r\n" + + " \"artifactVersion\" : \"1\"\r\n" + + " }" + + " ]\r\n" + + " }\r\n" + + " ]}"; + } + + + private String getSdcNotificationWithNonExistentArtifact() { + return "{\"distributionID\" : \"bcc7a72e-90b1-4c5f-9a37-28dc3cd86416\",\r\n" + " \"serviceName\" : \"Testnotificationser1\",\r\n" + " \"serviceVersion\" : \"1.0\",\r\n" + + " \"serviceUUID\" : \"7f7f94f4-373a-4b71-a0e3-80ae2ba4eb5d\",\r\n" + " \"serviceDescription\" : \"TestNotificationVF1\",\r\n" + " \"bugabuga\" : \"xyz\",\r\n" + " \"resources\" : [{\r\n" + + " \"resourceInstanceName\" : \"testnotificationvf11\",\r\n" + " \"resourceName\" : \"TestNotificationVF1\",\r\n" + " \"resourceVersion\" : \"1.0\",\r\n" + " \"resoucreType\" : \"VF\",\r\n" + + " \"resourceUUID\" : \"907e1746-9f69-40f5-9f2a-313654092a2d\",\r\n" + " \"artifacts\" : [{\r\n" + " \"artifactName\" : \"heat.yaml\",\r\n" + " \"artifactType\" : \"HEAT\",\r\n" + + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.yaml\",\r\n" + + " \"artifactChecksum\" : \"ODEyNjE4YTMzYzRmMTk2ODVhNTU2NTg3YWEyNmIxMTM\\u003d\",\r\n" + " \"artifactDescription\" : \"heat\",\r\n" + " \"artifactTimeout\" : 60,\r\n" + + " \"artifactUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\",\r\n" + " \"artifactBuga\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\",\r\n" + " \"artifactVersion\" : \"1\"\r\n" + + " }, {\r\n" + " \"artifactName\" : \"buga.bug\",\r\n" + " \"artifactType\" : \"BUGA_BUGA\",\r\n" + + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.env\",\r\n" + + " \"artifactChecksum\" : \"NGIzMjExZTM1NDc2NjBjOTQyMGJmMWNiMmU0NTE5NzM\\u003d\",\r\n" + " \"artifactDescription\" : \"Auto-generated HEAT Environment deployment artifact\",\r\n" + + " \"artifactTimeout\" : 0,\r\n" + " \"artifactUUID\" : \"ce65d31c-35c0-43a9-90c7-596fc51d0c86\",\r\n" + " \"artifactVersion\" : \"1\",\r\n" + + " \"generatedFromUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\"\r\n" + " }\r\n" + " ]\r\n" + " }\r\n" + " ]}"; + } + + private String getSdcServiceNotificationWithRelatedArtifacts() { + return "{\"distributionID\" : \"bcc7a72e-90b1-4c5f-9a37-28dc3cd86416\",\r\n" + " \"serviceName\" : \"Testnotificationser1\",\r\n" + " \"serviceVersion\" : \"1.0\",\r\n" + + " \"serviceUUID\" : \"7f7f94f4-373a-4b71-a0e3-80ae2ba4eb5d\",\r\n" + " \"serviceDescription\" : \"TestNotificationVF1\",\r\n" + " \"resources\" : [{\r\n" + " \"resourceInstanceName\" : \"testnotificationvf11\",\r\n" + + " \"resourceName\" : \"TestNotificationVF1\",\r\n" + " \"resourceVersion\" : \"1.0\",\r\n" + " \"resoucreType\" : \"VF\",\r\n" + " \"resourceUUID\" : \"907e1746-9f69-40f5-9f2a-313654092a2d\",\r\n" + + " \"artifacts\" : [{\r\n" + " \"artifactName\" : \"sample-xml-alldata-1-1.xml\",\r\n" + " \"artifactType\" : \"YANG_XML\",\r\n" + + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/sample-xml-alldata-1-1.xml\",\r\n" + + " \"artifactChecksum\" : \"MTUxODFkMmRlOTNhNjYxMGYyYTI1ZjA5Y2QyNWQyYTk\\u003d\",\r\n" + " \"artifactDescription\" : \"MyYang\",\r\n" + " \"artifactTimeout\" : 0,\r\n" + + " \"artifactUUID\" : \"0005bc4a-2c19-452e-be6d-d574a56be4d0\",\r\n" + " \"artifactVersion\" : \"1\",\r\n" + " \"relatedArtifacts\" : [\r\n" + + " \"ce65d31c-35c0-43a9-90c7-596fc51d0c86\"\r\n" + " ]" + " }, {\r\n" + " \"artifactName\" : \"heat.yaml\",\r\n" + + " \"artifactType\" : \"HEAT\",\r\n" + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.yaml\",\r\n" + + " \"artifactChecksum\" : \"ODEyNjE4YTMzYzRmMTk2ODVhNTU2NTg3YWEyNmIxMTM\\u003d\",\r\n" + " \"artifactDescription\" : \"heat\",\r\n" + " \"artifactTimeout\" : 60,\r\n" + + " \"artifactUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\",\r\n" + " \"artifactVersion\" : \"1\", \r\n" + " \"relatedArtifacts\" : [\r\n" + + " \"0005bc4a-2c19-452e-be6d-d574a56be4d0\", \r\n" + " \"ce65d31c-35c0-43a9-90c7-596fc51d0c86\"\r\n" + " ]" + " }, {\r\n" + + " \"artifactName\" : \"heat.env\",\r\n" + " \"artifactType\" : \"HEAT_ENV\",\r\n" + + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.env\",\r\n" + + " \"artifactChecksum\" : \"NGIzMjExZTM1NDc2NjBjOTQyMGJmMWNiMmU0NTE5NzM\\u003d\",\r\n" + " \"artifactDescription\" : \"Auto-generated HEAT Environment deployment artifact\",\r\n" + + " \"artifactTimeout\" : 0,\r\n" + " \"artifactUUID\" : \"ce65d31c-35c0-43a9-90c7-596fc51d0c86\",\r\n" + " \"artifactVersion\" : \"1\",\r\n" + + " \"generatedFromUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\"\r\n" + " }\r\n" + " ]\r\n" + " }\r\n" + " ]}"; + } + + private String getSdcServiceNotificationWithoutHeatArtifact() { + return "{" + " \"distributionID\" : \"5v1234d8-5b6d-42c4-7t54-47v95n58qb7\"," + " \"serviceName\" : \"srv1\"," + " \"serviceVersion\": \"2.0\"," + " \"serviceUUID\" : \"4e0697d8-5b6d-42c4-8c74-46c33d46624c\"," + + " \"serviceArtifacts\":[" + " {" + " \"artifactName\" : \"ddd.yml\"," + " \"artifactType\" : \"DG_XML\"," + " \"artifactTimeout\" : \"65\"," + + " \"artifactDescription\" : \"description\"," + " \"artifactURL\" :" + " \"/sdc/v1/catalog/services/srv1/2.0/resources/ddd/3.0/artifacts/ddd.xml\" ," + + " \"resourceUUID\" : \"4e5874d8-5b6d-42c4-8c74-46c33d90drw\" ," + " \"checksum\" : \"15e389rnrp58hsw==\"" + " }" + " ]" + "}"; + } + + private String getNotificationWithServiceArtifatcs() { + return "{\r\n" + " \"distributionID\" : \"bcc7a72e-90b1-4c5f-9a37-28dc3cd86416\",\r\n" + " \"serviceName\" : \"Testnotificationser1\",\r\n" + " \"serviceVersion\" : \"1.0\",\r\n" + + " \"serviceUUID\" : \"7f7f94f4-373a-4b71-a0e3-80ae2ba4eb5d\",\r\n" + " \"serviceDescription\" : \"TestNotificationVF1\",\r\n" + " \"serviceArtifacts\" : [{\r\n" + " \"artifactName\" : \"sample-xml-alldata-1-1.xml\",\r\n" + + " \"artifactType\" : \"YANG_XML\",\r\n" + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/sample-xml-alldata-1-1.xml\",\r\n" + + " \"artifactChecksum\" : \"MTUxODFkMmRlOTNhNjYxMGYyYTI1ZjA5Y2QyNWQyYTk\\u003d\",\r\n" + " \"artifactDescription\" : \"MyYang\",\r\n" + " \"artifactTimeout\" : 0,\r\n" + + " \"artifactUUID\" : \"0005bc4a-2c19-452e-be6d-d574a56be4d0\",\r\n" + " \"artifactVersion\" : \"1\"\r\n" + " }, {\r\n" + " \"artifactName\" : \"heat.yaml\",\r\n" + + " \"artifactType\" : \"HEAT\",\r\n" + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.yaml\",\r\n" + + " \"artifactChecksum\" : \"ODEyNjE4YTMzYzRmMTk2ODVhNTU2NTg3YWEyNmIxMTM\\u003d\",\r\n" + " \"artifactDescription\" : \"heat\",\r\n" + " \"artifactTimeout\" : 60,\r\n" + + " \"artifactUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\",\r\n" + " \"artifactVersion\" : \"1\"\r\n" + " }, {\r\n" + " \"artifactName\" : \"heat.env\",\r\n" + + " \"artifactType\" : \"HEAT_ENV\",\r\n" + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.env\",\r\n" + + " \"artifactChecksum\" : \"NGIzMjExZTM1NDc2NjBjOTQyMGJmMWNiMmU0NTE5NzM\\u003d\",\r\n" + " \"artifactDescription\" : \"Auto-generated HEAT Environment deployment artifact\",\r\n" + + " \"artifactTimeout\" : 0,\r\n" + " \"artifactUUID\" : \"ce65d31c-35c0-43a9-90c7-596fc51d0c86\",\r\n" + " \"artifactVersion\" : \"1\",\r\n" + + " \"generatedFromUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\"\r\n" + " }\r\n" + " ],\r\n" + " \"resources\" : [{\r\n" + " \"resourceInstanceName\" : \"testnotificationvf11\",\r\n" + + " \"resourceName\" : \"TestNotificationVF1\",\r\n" + " \"resourceVersion\" : \"1.0\",\r\n" + " \"resoucreType\" : \"VF\",\r\n" + " \"resourceUUID\" : \"907e1746-9f69-40f5-9f2a-313654092a2d\",\r\n" + + " \"artifacts\" : [{\r\n" + " \"artifactName\" : \"sample-xml-alldata-1-1.xml\",\r\n" + " \"artifactType\" : \"YANG_XML\",\r\n" + + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/sample-xml-alldata-1-1.xml\",\r\n" + + " \"artifactChecksum\" : \"MTUxODFkMmRlOTNhNjYxMGYyYTI1ZjA5Y2QyNWQyYTk\\u003d\",\r\n" + " \"artifactDescription\" : \"MyYang\",\r\n" + " \"artifactTimeout\" : 0,\r\n" + + " \"artifactUUID\" : \"0005bc4a-2c19-452e-be6d-d574a56be4d0\",\r\n" + " \"artifactVersion\" : \"1\"\r\n" + " }, {\r\n" + " \"artifactName\" : \"heat.yaml\",\r\n" + + " \"artifactType\" : \"HEAT\",\r\n" + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.yaml\",\r\n" + + " \"artifactChecksum\" : \"ODEyNjE4YTMzYzRmMTk2ODVhNTU2NTg3YWEyNmIxMTM\\u003d\",\r\n" + " \"artifactDescription\" : \"heat\",\r\n" + " \"artifactTimeout\" : 60,\r\n" + + " \"artifactUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\",\r\n" + " \"artifactVersion\" : \"1\"\r\n" + " }, {\r\n" + " \"artifactName\" : \"heat.env\",\r\n" + + " \"artifactType\" : \"HEAT_ENV\",\r\n" + " \"artifactURL\" : \"/sdc/v1/catalog/services/Testnotificationser1/1.0/resourceInstances/testnotificationvf11/artifacts/heat.env\",\r\n" + + " \"artifactChecksum\" : \"NGIzMjExZTM1NDc2NjBjOTQyMGJmMWNiMmU0NTE5NzM\\u003d\",\r\n" + " \"artifactDescription\" : \"Auto-generated HEAT Environment deployment artifact\",\r\n" + + " \"artifactTimeout\" : 0,\r\n" + " \"artifactUUID\" : \"ce65d31c-35c0-43a9-90c7-596fc51d0c86\",\r\n" + " \"artifactVersion\" : \"1\",\r\n" + + " \"generatedFromUUID\" : \"8df6123c-f368-47d3-93be-1972cefbcc35\"\r\n" + " }\r\n" + " ]\r\n" + " }\r\n" + " ]\r\n" + "}"; + } + + private int countInstances(List list, T element) { + int count = 0; + for (T curr : list) { + if (curr.equals(element)) { + count++; + } + } + return count; + } +} \ No newline at end of file diff --git a/sdc-distribution-client/src/test/java/org/onap/sdc/utils/NotificationSenderTest.java b/sdc-distribution-client/src/test/java/org/onap/sdc/utils/NotificationSenderTest.java index 4d61542..ed15094 100644 --- a/sdc-distribution-client/src/test/java/org/onap/sdc/utils/NotificationSenderTest.java +++ b/sdc-distribution-client/src/test/java/org/onap/sdc/utils/NotificationSenderTest.java @@ -20,115 +20,71 @@ package org.onap.sdc.utils; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.cambria.client.CambriaPublisher; -import java.io.IOException; -import java.util.Collections; -import java.util.List; +import java.util.concurrent.Future; +import nl.altindag.log.LogCaptor; +import org.apache.kafka.common.KafkaException; import org.junit.jupiter.api.Test; import org.onap.sdc.api.results.IDistributionClientResult; import org.onap.sdc.impl.DistributionClientResultImpl; +import org.onap.sdc.utils.kafka.SdcKafkaProducer; class NotificationSenderTest { private final String status = "status"; - private final CambriaPublisher.message message = new CambriaPublisher.message("sample-partition", "sample-message"); - private final List notEmptySendingFailedMessages = Collections.singletonList(message); + private final DistributionClientResultImpl successResponse = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "Messages successfully sent"); - private final DistributionClientResultImpl generalErrorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, - "Failed to send status"); - - private final CambriaBatchingPublisher publisher = mock(CambriaBatchingPublisher.class); - private final List emptyServers = Collections.emptyList(); - private final NotificationSender validNotificationSender = new NotificationSender(emptyServers); + private final DistributionClientResultImpl generalErrorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "Failed to send status"); + private final SdcKafkaProducer producer = mock(SdcKafkaProducer.class); + private final NotificationSender validNotificationSender = new NotificationSender(producer); @Test - void whenPublisherIsValidAndNoExceptionsAreThrownShouldReturnSuccessStatus() throws IOException, InterruptedException { + void whenPublisherIsValidAndNoExceptionsAreThrownShouldReturnSuccessStatus() { //given - when(publisher.send(anyString(), anyString())).thenReturn(0); - when(publisher.close(anyLong(), any())).thenReturn(Collections.emptyList()); + when(producer.send(anyString(), anyString(), anyString())).thenReturn(mock(Future.class)); //when - IDistributionClientResult result = validNotificationSender.send(publisher, status); + IDistributionClientResult result = validNotificationSender.send("mytopic", status); //then assertEquals(successResponse.getDistributionActionResult(), result.getDistributionActionResult()); } @Test - void whenPublisherCouldNotSendShouldReturnGeneralErrorStatus() throws IOException, InterruptedException { + void whenPublisherCouldNotSendShouldReturnGeneralErrorStatus() { //given - when(publisher.send(anyString(), anyString())).thenReturn(0); - when(publisher.close(anyLong(), any())).thenReturn(notEmptySendingFailedMessages); + when(producer.send(anyString(), anyString(), anyString())).thenReturn(mock(Future.class)); + doThrow(KafkaException.class) + .when(producer) + .flush(); //when - IDistributionClientResult result = validNotificationSender.send(publisher, status); + IDistributionClientResult result = validNotificationSender.send("mytopic", status); //then assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult()); } @Test - void whenSendingThrowsIOExceptionShouldReturnGeneralErrorStatus() throws IOException, InterruptedException { - //given - when(publisher.send(anyString(), anyString())).thenThrow(new IOException()); - when(publisher.close(anyLong(), any())).thenReturn(notEmptySendingFailedMessages); - - //when - IDistributionClientResult result = validNotificationSender.send(publisher, status); + void whenSendingThrowsIOExceptionShouldReturnGeneralErrorStatus() { + LogCaptor logCaptor = LogCaptor.forClass(NotificationSender.class); - //then - assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult()); - } - - @Test - void whenSendingThrowsInterruptedExceptionShouldReturnGeneralErrorStatus() throws IOException, InterruptedException { //given - when(publisher.send(anyString(), anyString())).thenAnswer(invocationOnMock -> { - throw new InterruptedException(); - }); - when(publisher.close(anyLong(), any())).thenReturn(notEmptySendingFailedMessages); + when(producer.send(anyString(), anyString(), anyString())).thenThrow(new KafkaException()); //when - IDistributionClientResult result = validNotificationSender.send(publisher, status); + validNotificationSender.send("mytopic", status); //then - assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult()); + assertThat(logCaptor.getLogs()).contains("DistributionClient - sendStatus. Failed to send status"); } - @Test - void whenClosingThrowsIOExceptionShouldReturnGeneralErrorStatus() throws IOException, InterruptedException { - //given - when(publisher.send(anyString(), anyString())).thenReturn(0); - when(publisher.close(anyLong(), any())).thenThrow(new IOException()); - - //when - IDistributionClientResult result = validNotificationSender.send(publisher, status); - - //then - assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult()); - } - - @Test - void whenClosingThrowsInterruptedExceptionShouldReturnGeneralErrorStatus() throws IOException, InterruptedException { - //given - when(publisher.send(anyString(), anyString())).thenReturn(0); - when(publisher.close(anyLong(), any())).thenAnswer(invocationOnMock -> { - throw new InterruptedException(); - }); - - //when - IDistributionClientResult result = validNotificationSender.send(publisher, status); - - //then - assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult()); - } } + diff --git a/sdc-distribution-client/src/test/java/org/onap/sdc/utils/SdcKafkaTest.java b/sdc-distribution-client/src/test/java/org/onap/sdc/utils/SdcKafkaTest.java new file mode 100644 index 0000000..744e9cc --- /dev/null +++ b/sdc-distribution-client/src/test/java/org/onap/sdc/utils/SdcKafkaTest.java @@ -0,0 +1,104 @@ +/*- + * ============LICENSE_START======================================================= + * sdc-distribution-client + * ================================================================================ + * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.sdc.utils; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.contentOf; + +import com.salesforce.kafka.test.KafkaTestCluster; +import com.salesforce.kafka.test.KafkaTestUtils; +import com.salesforce.kafka.test.listeners.BrokerListener; +import com.salesforce.kafka.test.listeners.SaslPlainListener; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junitpioneer.jupiter.SetEnvironmentVariable; +import org.onap.sdc.api.consumer.IConfiguration; +import org.onap.sdc.impl.Configuration; +import org.onap.sdc.utils.kafka.SdcKafkaConsumer; +import org.onap.sdc.utils.kafka.SdcKafkaProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SetEnvironmentVariable(key = "SASL_JAAS_CONFIG", value = "org.apache.kafka.common.security.scram.ScramLoginModule required username=admin password=admin-secret;") +class SdcKafkaTest { + + private static final Logger logger = LoggerFactory.getLogger(SdcKafkaTest.class); + + private static final Configuration configuration = new Configuration(new TestConfiguration()); + private static KafkaTestCluster kafkaTestCluster = null; + private static final String topicName = "my-test-topic"; + + static { + System.setProperty("java.security.auth.login.config", "src/test/resources/jaas.conf"); + } + + @BeforeAll + static void before() throws Exception { + startKafkaService(); + KafkaTestUtils utils = new KafkaTestUtils(kafkaTestCluster); + utils.createTopic(topicName, 1, (short) 1); + configuration.setMsgBusAddress(Collections.singletonList(kafkaTestCluster.getKafkaConnectString())); + } + + @AfterAll + static void after() throws Exception { + kafkaTestCluster.close(); + kafkaTestCluster.stop(); + } + + @Test + void whenProducingCorrectRecordsArePresent() { + SdcKafkaConsumer consumer = new SdcKafkaConsumer(configuration); + consumer.subscribe(topicName); + consumer.poll(); + + SdcKafkaProducer producer = new SdcKafkaProducer(configuration); + producer.send(topicName, "blah", "blah"); + producer.send(topicName, "blah", "blah"); + producer.send(topicName, "blah", "blah"); + producer.flush(); + + List events = consumer.poll(); + + assertThat(events).hasSize(3); + } + + private static void startKafkaService() throws Exception { + final BrokerListener listener = new SaslPlainListener() + .withUsername("kafkaclient") + .withPassword("client-secret"); + + final Properties brokerProperties = new Properties(); + kafkaTestCluster = new KafkaTestCluster( + 1, + brokerProperties, + Collections.singletonList(listener) + ); + + kafkaTestCluster.start(); + logger.debug("Cluster started at: {}", kafkaTestCluster.getKafkaConnectString()); + } +} \ No newline at end of file diff --git a/sdc-distribution-client/src/test/java/org/onap/sdc/utils/TestConfiguration.java b/sdc-distribution-client/src/test/java/org/onap/sdc/utils/TestConfiguration.java index ca7abba..c2fc536 100644 --- a/sdc-distribution-client/src/test/java/org/onap/sdc/utils/TestConfiguration.java +++ b/sdc-distribution-client/src/test/java/org/onap/sdc/utils/TestConfiguration.java @@ -22,12 +22,11 @@ package org.onap.sdc.utils; import java.util.ArrayList; import java.util.List; - import org.onap.sdc.api.consumer.IConfiguration; public class TestConfiguration implements IConfiguration { - private String asdcAddress; + private String sdcAddress; private String user; private String password; private int pollingInterval = DistributionClientConstants.MIN_POLLING_INTERVAL_SEC; @@ -36,11 +35,13 @@ public class TestConfiguration implements IConfiguration { private String consumerGroup; private String environmentName; private String comsumerID; + private final String kafkaSecurityProtocolConfig; + private final String kafkaSaslMechanism; + private final String kafkaSaslJaasConfig; private String keyStorePath; private String keyStorePassword; private boolean activateServerTLSAuth; private boolean isFilterInEmptyResources; - private boolean useHttpsWithDmaap; private boolean useHttpsWithSDC; private List msgBusAddress; private String httpProxyHost; @@ -48,11 +49,16 @@ public class TestConfiguration implements IConfiguration { private String httpsProxyHost; private int httpsProxyPort; private boolean useSystemProxy; + private String sdcStatusTopicName; + private String sdcNotificationTopicName; public TestConfiguration(IConfiguration other) { - this.asdcAddress = other.getAsdcAddress(); + this.sdcAddress = other.getSdcAddress(); this.comsumerID = other.getConsumerID(); this.consumerGroup = other.getConsumerGroup(); + this.kafkaSecurityProtocolConfig = other.getKafkaSecurityProtocolConfig(); + this.kafkaSaslMechanism = other.getKafkaSaslMechanism(); + this.kafkaSaslJaasConfig = other.getKafkaSaslJaasConfig(); this.environmentName = other.getEnvironmentName(); this.password = other.getPassword(); this.pollingInterval = other.getPollingInterval(); @@ -71,30 +77,37 @@ public class TestConfiguration implements IConfiguration { } public TestConfiguration() { - this.asdcAddress = "localhost:8443"; + this.sdcAddress = "localhost:8443"; this.comsumerID = "mso-123456"; this.consumerGroup = "mso-group"; this.environmentName = "PROD"; this.password = "password"; this.pollingInterval = 20; this.pollingTimeout = 20; - this.relevantArtifactTypes = new ArrayList(); + this.setSdcStatusTopicName("SDC-STATUS-TOPIC"); + this.setSdcNotificationTopicName("SDC-NOTIF-TOPIC"); + this.relevantArtifactTypes = new ArrayList<>(); this.relevantArtifactTypes.add(ArtifactTypeEnum.HEAT.name()); this.user = "mso-user"; - this.keyStorePath = "etc/asdc-client.jks"; + this.keyStorePath = "etc/sdc-client.jks"; this.keyStorePassword = "Aa123456"; this.activateServerTLSAuth = false; this.isFilterInEmptyResources = false; this.useHttpsWithSDC = true; - msgBusAddress = new ArrayList(); - msgBusAddress.add("www.cnn.com"); - msgBusAddress.add("www.cnn.com"); - msgBusAddress.add("www.cnn.com"); + msgBusAddress = new ArrayList<>(); + msgBusAddress.add("kafka-bootstrap1:9092"); + msgBusAddress.add("kafka-bootstrap2:9092"); + msgBusAddress.add("kafka-bootstrap3:9092"); + this.kafkaSecurityProtocolConfig = "SASL_PLAINTEXT"; + this.kafkaSaslMechanism = "PLAIN"; + this.kafkaSaslJaasConfig = "org.apache.kafka.common.security.scram.ScramLoginModule required username=admin password=admin-secret;"; + this.httpProxyHost = "proxy"; + this.httpProxyPort = 8080; } @Override - public String getAsdcAddress() { - return asdcAddress; + public String getSdcAddress() { + return sdcAddress; } @Override @@ -102,6 +115,21 @@ public class TestConfiguration implements IConfiguration { return msgBusAddress; } + @Override + public String getKafkaSecurityProtocolConfig() { + return kafkaSecurityProtocolConfig; + } + + @Override + public String getKafkaSaslMechanism() { + return kafkaSaslMechanism; + } + + @Override + public String getKafkaSaslJaasConfig() { + return kafkaSaslJaasConfig; + } + @Override public String getUser() { return user; @@ -185,8 +213,8 @@ public class TestConfiguration implements IConfiguration { this.comsumerID = comsumerID; } - public void setAsdcAddress(String asdcAddress) { - this.asdcAddress = asdcAddress; + public void setSdcAddress(String sdcAddress) { + this.sdcAddress = sdcAddress; } public void setUser(String user) { @@ -249,7 +277,7 @@ public class TestConfiguration implements IConfiguration { public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + ((asdcAddress == null) ? 0 : asdcAddress.hashCode()); + result = prime * result + ((sdcAddress == null) ? 0 : sdcAddress.hashCode()); result = prime * result + ((comsumerID == null) ? 0 : comsumerID.hashCode()); result = prime * result + ((consumerGroup == null) ? 0 : consumerGroup.hashCode()); result = prime * result + ((environmentName == null) ? 0 : environmentName.hashCode()); @@ -280,10 +308,10 @@ public class TestConfiguration implements IConfiguration { if (getClass() != obj.getClass()) return false; TestConfiguration other = (TestConfiguration) obj; - if (asdcAddress == null) { - if (other.asdcAddress != null) + if (sdcAddress == null) { + if (other.sdcAddress != null) return false; - } else if (!asdcAddress.equals(other.asdcAddress)) + } else if (!sdcAddress.equals(other.sdcAddress)) return false; if (comsumerID == null) { if (other.comsumerID != null) @@ -325,17 +353,14 @@ public class TestConfiguration implements IConfiguration { } else if (!keyStorePath.equals(other.keyStorePath)) return false; if (keyStorePassword == null) { - if (other.keyStorePassword != null) - return false; - } else if (!keyStorePassword.equals(other.keyStorePassword)) - return false; - - return true; + return other.keyStorePassword == null; + } else + return keyStorePassword.equals(other.keyStorePassword); } @Override public String toString() { - return "TestConfiguration [asdcAddress=" + asdcAddress + ", user=" + user + ", password=" + password + return "TestConfiguration [sdcAddress=" + sdcAddress + ", user=" + user + ", password=" + password + ", pollingInterval=" + pollingInterval + ", pollingTimeout=" + pollingTimeout + ", relevantArtifactTypes=" + relevantArtifactTypes + ", consumerGroup=" + consumerGroup + ", environmentName=" + environmentName + ", comsumerID=" + comsumerID + "]"; @@ -350,11 +375,6 @@ public class TestConfiguration implements IConfiguration { this.isFilterInEmptyResources = isFilterInEmptyResources; } - @Override - public Boolean isUseHttpsWithDmaap() { - return this.useHttpsWithDmaap; - } - @Override public Boolean isUseHttpsWithSDC() { return this.useHttpsWithSDC; @@ -363,4 +383,20 @@ public class TestConfiguration implements IConfiguration { public void setUseHttpsWithSDC(Boolean useHttpsWithSDC) { this.useHttpsWithSDC = useHttpsWithSDC; } + + public String getSdcStatusTopicName() { + return sdcStatusTopicName; + } + + public void setSdcStatusTopicName(String sdcStatusTopicName) { + this.sdcStatusTopicName = sdcStatusTopicName; + } + + public String getSdcNotificationTopicName() { + return sdcNotificationTopicName; + } + + public void setSdcNotificationTopicName(String sdcNotificationTopicName) { + this.sdcNotificationTopicName = sdcNotificationTopicName; + } } diff --git a/sdc-distribution-client/src/test/resources/asdc-client.jks b/sdc-distribution-client/src/test/resources/asdc-client.jks deleted file mode 100644 index eb0a0d3..0000000 Binary files a/sdc-distribution-client/src/test/resources/asdc-client.jks and /dev/null differ diff --git a/sdc-distribution-client/src/test/resources/jaas.conf b/sdc-distribution-client/src/test/resources/jaas.conf new file mode 100644 index 0000000..6f7fb5a --- /dev/null +++ b/sdc-distribution-client/src/test/resources/jaas.conf @@ -0,0 +1,20 @@ +KafkaServer { + org.apache.kafka.common.security.plain.PlainLoginModule required + username="admin" + password="admin-secret" + user_admin="admin-secret" + user_kafkaclient="client-secret"; +}; + +Server { + org.apache.zookeeper.server.auth.DigestLoginModule required + username="admin" + password="admin-secret" + user_zooclient="client-secret"; +}; + +Client { + org.apache.zookeeper.server.auth.DigestLoginModule required + username="zooclient" + password="client-secret"; +}; \ No newline at end of file diff --git a/sdc-distribution-client/src/test/resources/log4j.xml b/sdc-distribution-client/src/test/resources/log4j.xml new file mode 100644 index 0000000..75e94c1 --- /dev/null +++ b/sdc-distribution-client/src/test/resources/log4j.xml @@ -0,0 +1,26 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/sdc-distribution-client/src/test/resources/sdc-client.jks b/sdc-distribution-client/src/test/resources/sdc-client.jks new file mode 100644 index 0000000..eb0a0d3 Binary files /dev/null and b/sdc-distribution-client/src/test/resources/sdc-client.jks differ diff --git a/version.properties b/version.properties index d1722d6..890f74f 100644 --- a/version.properties +++ b/version.properties @@ -3,9 +3,9 @@ # Note that these variables cannot be structured (e.g. : version.release or version.snapshot etc... ) # because they are used in Jenkins, whose plug-in doesn't support -major=1 -minor=4 -patch=5 +major=2 +minor=0 +patch=0 base_version=${major}.${minor}.${patch} -- cgit 1.2.3-korg