aboutsummaryrefslogtreecommitdiffstats
path: root/sdc-distribution-client/src/main/java/org/onap/sdc/impl
diff options
context:
space:
mode:
Diffstat (limited to 'sdc-distribution-client/src/main/java/org/onap/sdc/impl')
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java84
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/impl/ConfigurationValidator.java30
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java482
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientResultImpl.java4
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java72
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/impl/StatusConsumer.java29
6 files changed, 288 insertions, 413 deletions
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java
index 2da9c4e..db4433b 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java
@@ -28,7 +28,12 @@ import org.onap.sdc.utils.DistributionClientConstants;
public class Configuration implements IConfiguration {
private List<String> msgBusAddressList;
- private String asdcAddress;
+ private final String kafkaSecurityProtocolConfig;
+ private final String kafkaSaslMechanism;
+ private final String kafkaSaslJaasConfig;
+ private String sdcStatusTopicName;
+ private String sdcNotificationTopicName;
+ private String sdcAddress;
private String user;
private String password;
private int pollingInterval = DistributionClientConstants.MIN_POLLING_INTERVAL_SEC;
@@ -40,10 +45,9 @@ public class Configuration implements IConfiguration {
private String keyStorePath;
private String keyStorePassword;
private boolean activateServerTLSAuth;
- private boolean filterInEmptyResources;
- private Boolean useHttpsWithDmaap;
+ private final boolean filterInEmptyResources;
private Boolean useHttpsWithSDC;
- private boolean consumeProduceStatusTopic;
+ private final boolean consumeProduceStatusTopic;
private String httpProxyHost;
private int httpProxyPort;
private String httpsProxyHost;
@@ -51,23 +55,24 @@ public class Configuration implements IConfiguration {
private boolean useSystemProxy;
public Configuration(IConfiguration other) {
- this.asdcAddress = other.getAsdcAddress();
- this.msgBusAddressList = other.getMsgBusAddress();
+ this.kafkaSecurityProtocolConfig = other.getKafkaSecurityProtocolConfig();
+ this.kafkaSaslMechanism = other.getKafkaSaslMechanism();
+ this.kafkaSaslJaasConfig = other.getKafkaSaslJaasConfig();
this.comsumerID = other.getConsumerID();
this.consumerGroup = other.getConsumerGroup();
- this.environmentName = other.getEnvironmentName();
- this.password = other.getPassword();
this.pollingInterval = other.getPollingInterval();
this.pollingTimeout = other.getPollingTimeout();
- this.relevantArtifactTypes = other.getRelevantArtifactTypes();
+ this.environmentName = other.getEnvironmentName();
+ this.consumeProduceStatusTopic = other.isConsumeProduceStatusTopic();
+ this.sdcAddress = other.getSdcAddress();
this.user = other.getUser();
+ this.password = other.getPassword();
+ this.relevantArtifactTypes = other.getRelevantArtifactTypes();
this.useHttpsWithSDC = other.isUseHttpsWithSDC();
this.keyStorePath = other.getKeyStorePath();
this.keyStorePassword = other.getKeyStorePassword();
this.activateServerTLSAuth = other.activateServerTLSAuth();
this.filterInEmptyResources = other.isFilterInEmptyResources();
- this.useHttpsWithDmaap = other.isUseHttpsWithDmaap();
- this.consumeProduceStatusTopic = other.isConsumeProduceStatusTopic();
this.httpProxyHost = other.getHttpProxyHost();
this.httpProxyPort = other.getHttpProxyPort();
this.httpsProxyHost = other.getHttpsProxyHost();
@@ -76,8 +81,24 @@ public class Configuration implements IConfiguration {
}
@Override
- public String getAsdcAddress() {
- return asdcAddress;
+ public String getSdcAddress() {
+ return sdcAddress;
+ }
+
+ public String getStatusTopicName() {
+ return sdcStatusTopicName;
+ }
+
+ public void setStatusTopicName(String sdcStatusTopicName) {
+ this.sdcStatusTopicName = sdcStatusTopicName;
+ }
+
+ public String getNotificationTopicName() {
+ return sdcNotificationTopicName;
+ }
+
+ public void setNotificationTopicName(String sdcNotificationTopicName) {
+ this.sdcNotificationTopicName = sdcNotificationTopicName;
}
@Override
@@ -85,6 +106,25 @@ public class Configuration implements IConfiguration {
return msgBusAddressList;
}
+ public void setMsgBusAddress(List<String> newMsgBusAddress) {
+ msgBusAddressList = newMsgBusAddress;
+ }
+
+ @Override
+ public String getKafkaSecurityProtocolConfig() {
+ return kafkaSecurityProtocolConfig;
+ }
+
+ @Override
+ public String getKafkaSaslMechanism() {
+ return kafkaSaslMechanism;
+ }
+
+ @Override
+ public String getKafkaSaslJaasConfig() {
+ return kafkaSaslJaasConfig;
+ }
+
@Override
public Boolean isUseHttpsWithSDC() {
return useHttpsWithSDC;
@@ -169,8 +209,8 @@ public class Configuration implements IConfiguration {
this.comsumerID = comsumerID;
}
- public void setAsdcAddress(String asdcAddress) {
- this.asdcAddress = asdcAddress;
+ public void setSdcAddress(String sdcAddress) {
+ this.sdcAddress = sdcAddress;
}
public void setUser(String user) {
@@ -243,19 +283,10 @@ public class Configuration implements IConfiguration {
return this.filterInEmptyResources;
}
- @Override
- public Boolean isUseHttpsWithDmaap() {
- return this.useHttpsWithDmaap;
- }
-
public void setUseHttpsWithSDC(boolean useHttpsWithSDC) {
this.useHttpsWithSDC = useHttpsWithSDC;
}
- public void setUseHttpsWithDmaap(boolean useHttpsWithDmaap) {
- this.useHttpsWithDmaap = useHttpsWithDmaap;
- }
-
@Override
public boolean isConsumeProduceStatusTopic() {
return this.consumeProduceStatusTopic;
@@ -265,11 +296,13 @@ public class Configuration implements IConfiguration {
public String toString() {
//@formatter:off
return "Configuration ["
- + "asdcAddress=" + asdcAddress
+ + "sdcAddress=" + sdcAddress
+ ", user=" + user
+ ", password=" + password
+ ", useHttpsWithSDC=" + useHttpsWithSDC
+ ", pollingInterval=" + pollingInterval
+ + ", sdcStatusTopicName=" + sdcStatusTopicName
+ + ", sdcNotificationTopicName=" + sdcNotificationTopicName
+ ", pollingTimeout=" + pollingTimeout
+ ", relevantArtifactTypes=" + relevantArtifactTypes
+ ", consumerGroup=" + consumerGroup
@@ -279,7 +312,6 @@ public class Configuration implements IConfiguration {
+ ", keyStorePassword=" + keyStorePassword
+ ", activateServerTLSAuth=" + activateServerTLSAuth
+ ", filterInEmptyResources=" + filterInEmptyResources
- + ", useHttpsWithDmaap=" + useHttpsWithDmaap
+ ", consumeProduceStatusTopic=" + consumeProduceStatusTopic
+ ", useSystemProxy=" + useSystemProxy
+ ", httpProxyHost=" + httpProxyHost
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/ConfigurationValidator.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/ConfigurationValidator.java
index b645ed1..829c6ce 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/ConfigurationValidator.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/ConfigurationValidator.java
@@ -19,23 +19,22 @@
*/
package org.onap.sdc.impl;
-import org.onap.sdc.api.consumer.IConfiguration;
-import org.onap.sdc.api.consumer.IStatusCallback;
-import org.onap.sdc.utils.DistributionActionResultEnum;
-import org.onap.sdc.utils.DistributionClientConstants;
-
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.regex.Matcher;
+import org.onap.sdc.api.consumer.IConfiguration;
+import org.onap.sdc.api.consumer.IStatusCallback;
+import org.onap.sdc.utils.DistributionActionResultEnum;
+import org.onap.sdc.utils.DistributionClientConstants;
public class ConfigurationValidator {
private Map<Function<IConfiguration, Boolean>, DistributionActionResultEnum> cachedValidators;
- DistributionActionResultEnum validateConfiguration(IConfiguration conf, IStatusCallback statusCallback) {
+ public DistributionActionResultEnum validateConfiguration(IConfiguration conf, IStatusCallback statusCallback) {
final Map<Function<IConfiguration, Boolean>, DistributionActionResultEnum> validators = getValidators(statusCallback);
for (Map.Entry<Function<IConfiguration, Boolean>, DistributionActionResultEnum> validation : validators.entrySet()) {
@@ -53,10 +52,8 @@ public class ConfigurationValidator {
validators.put(isCustomerIdNotSet(), DistributionActionResultEnum.CONF_MISSING_CONSUMER_ID);
validators.put(isUserNotSet(), DistributionActionResultEnum.CONF_MISSING_USERNAME);
validators.put(isPasswordNotSet(), DistributionActionResultEnum.CONF_MISSING_PASSWORD);
- validators.put(isMsgBusAddressNotSet(), DistributionActionResultEnum.CONF_MISSING_MSG_BUS_ADDRESS);
- validators.put(isAsdcAddressNotSet(), DistributionActionResultEnum.CONF_MISSING_ASDC_FQDN);
- validators.put(isFqdnValid(), DistributionActionResultEnum.CONF_INVALID_ASDC_FQDN);
- validators.put(areFqdnsValid(), DistributionActionResultEnum.CONF_INVALID_MSG_BUS_ADDRESS);
+ validators.put(isSdcAddressNotSet(), DistributionActionResultEnum.CONF_MISSING_SDC_FQDN);
+ validators.put(isFqdnValid(), DistributionActionResultEnum.CONF_INVALID_SDC_FQDN);
validators.put(isEnvNameNotSet(), DistributionActionResultEnum.CONF_MISSING_ENVIRONMENT_NAME);
validators.put(isRelevantArtifactTypesNotSet(), DistributionActionResultEnum.CONF_MISSING_ARTIFACT_TYPES);
validators.put(isConsumeStatusTopicWithCallbackNotSet(statusCallback), DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG);
@@ -85,17 +82,10 @@ public class ConfigurationValidator {
return it -> it.getEnvironmentName() == null || it.getEnvironmentName().isEmpty();
}
- private Function<IConfiguration, Boolean> areFqdnsValid() {
- return it -> !isValidFqdns(it.getMsgBusAddress());
- }
-
private Function<IConfiguration, Boolean> isFqdnValid() {
- return it -> !isValidFqdn(it.getAsdcAddress());
+ return it -> !isValidFqdn(it.getSdcAddress());
}
- private Function<IConfiguration, Boolean> isMsgBusAddressNotSet() {
- return it -> it.getMsgBusAddress() == null || it.getMsgBusAddress().isEmpty();
- }
private Function<IConfiguration, Boolean> isPasswordNotSet() {
return it -> it.getPassword() == null || it.getPassword().isEmpty();
@@ -113,8 +103,8 @@ public class ConfigurationValidator {
return it -> it.getConsumerID() == null || it.getConsumerID().isEmpty();
}
- private Function<IConfiguration, Boolean> isAsdcAddressNotSet() {
- return it -> it.getAsdcAddress() == null || it.getAsdcAddress().isEmpty();
+ private Function<IConfiguration, Boolean> isSdcAddressNotSet() {
+ return it -> it.getSdcAddress() == null || it.getSdcAddress().isEmpty();
}
static boolean isValidFqdn(String fqdn) {
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java
index 3a25abb..a34ba1e 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java
@@ -23,11 +23,12 @@ package org.onap.sdc.impl;
import static java.util.Objects.isNull;
-import java.io.IOException;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
+import fj.data.Either;
import java.lang.reflect.Type;
-import java.net.MalformedURLException;
import java.nio.charset.StandardCharsets;
-import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -35,8 +36,8 @@ import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-
import org.apache.http.HttpHost;
+import org.apache.kafka.common.KafkaException;
import org.onap.sdc.api.IDistributionClient;
import org.onap.sdc.api.IDistributionStatusMessageJsonBuilder;
import org.onap.sdc.api.consumer.IComponentDoneStatusMessage;
@@ -46,53 +47,35 @@ import org.onap.sdc.api.consumer.IFinalDistrStatusMessage;
import org.onap.sdc.api.consumer.INotificationCallback;
import org.onap.sdc.api.consumer.IStatusCallback;
import org.onap.sdc.api.notification.IArtifactInfo;
+import org.onap.sdc.api.notification.IVfModuleMetadata;
import org.onap.sdc.api.results.IDistributionClientDownloadResult;
import org.onap.sdc.api.results.IDistributionClientResult;
-import org.onap.sdc.http.HttpAsdcClient;
+import org.onap.sdc.http.HttpClientFactory;
+import org.onap.sdc.http.HttpRequestFactory;
+import org.onap.sdc.http.HttpSdcClient;
import org.onap.sdc.http.SdcConnectorClient;
-import org.onap.sdc.http.TopicRegistrationResponse;
import org.onap.sdc.utils.DistributionActionResultEnum;
import org.onap.sdc.utils.DistributionClientConstants;
-import org.onap.sdc.utils.GeneralUtils;
import org.onap.sdc.utils.NotificationSender;
import org.onap.sdc.utils.Pair;
import org.onap.sdc.utils.Wrapper;
-import org.onap.sdc.api.notification.IVfModuleMetadata;
+import org.onap.sdc.utils.kafka.KafkaDataResponse;
+import org.onap.sdc.utils.kafka.SdcKafkaConsumer;
+import org.onap.sdc.utils.kafka.SdcKafkaProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.att.nsa.apiClient.credentials.ApiCredential;
-import com.att.nsa.apiClient.http.HttpException;
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.cambria.client.CambriaClient.CambriaApiException;
-import com.att.nsa.cambria.client.CambriaClientBuilders.AbstractAuthenticatedManagerBuilder;
-import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
-import com.att.nsa.cambria.client.CambriaClientBuilders.IdentityManagerBuilder;
-import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
-import com.att.nsa.cambria.client.CambriaConsumer;
-import com.att.nsa.cambria.client.CambriaIdentityManager;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.reflect.TypeToken;
-
-import fj.data.Either;
-
public class DistributionClientImpl implements IDistributionClient {
- private static final int POLLING_TIMEOUT_MULTIPLIER = 1000;
private static final int TERMINATION_TIMEOUT = 60;
private final Logger log;
- private SdcConnectorClient asdcConnector;
+ private SdcConnectorClient sdcConnector;
private ScheduledExecutorService executorPool = null;
- protected CambriaIdentityManager cambriaIdentityManager = null;
- private List<String> brokerServers;
- private ApiCredential credential;
+ private SdcKafkaProducer producer;
protected Configuration configuration;
private INotificationCallback callback;
private IStatusCallback statusCallback;
- private String notificationTopic;
- private String statusTopic;
private boolean isConsumerGroupGenerated = false;
private NotificationSender notificationSender;
private final ConfigurationValidator configurationValidator = new ConfigurationValidator();
@@ -110,12 +93,69 @@ public class DistributionClientImpl implements IDistributionClient {
}
@Override
+ public synchronized IDistributionClientResult init(IConfiguration conf, INotificationCallback notificationCallback, IStatusCallback statusCallback) {
+ IDistributionClientResult initResult;
+ if (!conf.isConsumeProduceStatusTopic()) {
+ initResult = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG,
+ "configuration is invalid: isConsumeProduceStatusTopic() should be set to 'true'");
+
+ } else if (isNull(statusCallback)) {
+ initResult = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG,
+ "configuration is invalid: statusCallback is not defined");
+ } else {
+ this.statusCallback = statusCallback;
+ initResult = init(conf, notificationCallback);
+ }
+ return initResult;
+ }
+
+ @Override
+ public synchronized IDistributionClientResult init(IConfiguration conf, INotificationCallback callback) {
+
+ log.info("DistributionClient - init");
+
+ Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
+ validateNotInitilized(errorWrapper);
+ if (errorWrapper.isEmpty()) {
+ validateNotTerminated(errorWrapper);
+ }
+ if (errorWrapper.isEmpty()) {
+ this.configuration = validateAndInitConfiguration(errorWrapper, conf).getSecond();
+ this.sdcConnector = createSdcConnector(configuration);
+ }
+ if (errorWrapper.isEmpty()) {
+ validateArtifactTypesWithSdcServer(conf, errorWrapper);
+ }
+ if (errorWrapper.isEmpty()) {
+ this.callback = callback;
+ }
+ if (errorWrapper.isEmpty()) {
+ initKafkaData(errorWrapper);
+ }
+ if (errorWrapper.isEmpty()) {
+ initKafkaProducer(errorWrapper, configuration);
+ }
+ if (errorWrapper.isEmpty()) {
+ this.notificationSender = new NotificationSender(producer);
+ }
+ IDistributionClientResult result;
+ if (errorWrapper.isEmpty()) {
+ isInitialized = true;
+ result = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS,
+ "distribution client initialized successfully");
+ } else {
+ result = errorWrapper.getInnerElement();
+ }
+
+ return result;
+ }
+
+ @Override
public IConfiguration getConfiguration() {
return configuration;
}
@Override
- /* see javadoc */
public synchronized IDistributionClientResult updateConfiguration(IConfiguration conf) {
log.info("update DistributionClient configuration");
@@ -126,33 +166,34 @@ public class DistributionClientImpl implements IDistributionClient {
return errorWrapper.getInnerElement();
}
- IDistributionClientResult updateResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "configuration updated successfuly");
+ IDistributionClientResult updateResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS,
+ "configuration updated successfully");
- boolean needToUpdateCambriaConsumer = false;
+ boolean needToUpdateConsumer = false;
if (conf.getRelevantArtifactTypes() != null && !conf.getRelevantArtifactTypes().isEmpty()) {
configuration.setRelevantArtifactTypes(conf.getRelevantArtifactTypes());
- needToUpdateCambriaConsumer = true;
+ needToUpdateConsumer = true;
}
if (isPollingIntervalValid(conf.getPollingInterval())) {
configuration.setPollingInterval(conf.getPollingInterval());
- needToUpdateCambriaConsumer = true;
+ needToUpdateConsumer = true;
}
if (isPollingTimeoutValid(conf.getPollingTimeout())) {
configuration.setPollingTimeout(conf.getPollingTimeout());
- needToUpdateCambriaConsumer = true;
+ needToUpdateConsumer = true;
}
if (conf.getConsumerGroup() != null) {
configuration.setConsumerGroup(conf.getConsumerGroup());
isConsumerGroupGenerated = false;
- needToUpdateCambriaConsumer = true;
+ needToUpdateConsumer = true;
} else if (!isConsumerGroupGenerated) {
String generatedConsumerGroup = UUID.randomUUID().toString();
configuration.setConsumerGroup(generatedConsumerGroup);
isConsumerGroupGenerated = true;
}
- if (needToUpdateCambriaConsumer) {
+ if (needToUpdateConsumer) {
updateResult = restartConsumer();
}
@@ -167,7 +208,7 @@ public class DistributionClientImpl implements IDistributionClient {
log.info("start DistributionClient");
IDistributionClientResult startResult;
- CambriaConsumer cambriaNotificationConsumer = null;
+ SdcKafkaConsumer kafkaConsumer = null;
Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
validateRunReady(errorWrapper);
if (errorWrapper.isEmpty()) {
@@ -175,49 +216,49 @@ public class DistributionClientImpl implements IDistributionClient {
}
if (errorWrapper.isEmpty()) {
try {
- cambriaNotificationConsumer = new ConsumerBuilder().authenticatedBy(credential.getApiKey(), credential.getApiSecret()).knownAs(configuration.getConsumerGroup(), configuration.getConsumerID()).onTopic(notificationTopic).usingHttps(configuration.isUseHttpsWithDmaap()).usingHosts(brokerServers)
- .withSocketTimeout(configuration.getPollingTimeout() * POLLING_TIMEOUT_MULTIPLIER).build();
-
- } catch (MalformedURLException | GeneralSecurityException e) {
- handleCambriaInitFailure(errorWrapper, e);
+ kafkaConsumer = new SdcKafkaConsumer(configuration);
+ kafkaConsumer.subscribe(configuration.getNotificationTopicName());
+ } catch (KafkaException | IllegalArgumentException e) {
+ handleMessagingClientInitFailure(errorWrapper, e);
}
}
if (errorWrapper.isEmpty()) {
-
- List<String> relevantArtifactTypes = configuration.getRelevantArtifactTypes();
- // Remove nulls from list - workaround for how configuration is built
- relevantArtifactTypes.removeAll(Collections.singleton(null));
- NotificationConsumer consumer = new NotificationConsumer(cambriaNotificationConsumer, callback, relevantArtifactTypes, this);
- executorPool = Executors.newScheduledThreadPool(DistributionClientConstants.POOL_SIZE);
- executorPool.scheduleAtFixedRate(consumer, 0, configuration.getPollingInterval(), TimeUnit.SECONDS);
-
- handleStatusConsumer(errorWrapper, executorPool);
+ startNotificationConsumer(kafkaConsumer);
+ startStatusConsumer(errorWrapper, executorPool);
}
if (!errorWrapper.isEmpty()) {
startResult = errorWrapper.getInnerElement();
} else {
- startResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "distribution client started successfuly");
+ startResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS,
+ "distribution client started successfully");
isStarted = true;
}
return startResult;
}
- private void handleStatusConsumer(Wrapper<IDistributionClientResult> errorWrapper, ScheduledExecutorService executorPool) {
+ private void startNotificationConsumer(SdcKafkaConsumer kafkaConsumer) {
+ List<String> relevantArtifactTypes = configuration.getRelevantArtifactTypes();
+ // Remove nulls from list - workaround for how configuration is built
+ relevantArtifactTypes.removeAll(Collections.singleton(null));
+ NotificationConsumer consumer = new NotificationConsumer(kafkaConsumer, callback, relevantArtifactTypes, this);
+ executorPool = Executors.newScheduledThreadPool(DistributionClientConstants.POOL_SIZE);
+ executorPool.scheduleAtFixedRate(consumer, 0, configuration.getPollingInterval(), TimeUnit.SECONDS);
+ }
+
+ private void startStatusConsumer(Wrapper<IDistributionClientResult> errorWrapper, ScheduledExecutorService executorPool) {
if (configuration.isConsumeProduceStatusTopic()) {
- CambriaConsumer cambriaStatusConsumer;
try {
- cambriaStatusConsumer = new ConsumerBuilder().authenticatedBy(credential.getApiKey(), credential.getApiSecret()).knownAs(configuration.getConsumerGroup(), configuration.getConsumerID()).onTopic(statusTopic).usingHttps(configuration.isUseHttpsWithDmaap()).usingHosts(brokerServers)
- .withSocketTimeout(configuration.getPollingTimeout() * POLLING_TIMEOUT_MULTIPLIER).build();
- StatusConsumer statusConsumer = new StatusConsumer(cambriaStatusConsumer, statusCallback);
+ SdcKafkaConsumer kafkaConsumer = new SdcKafkaConsumer(configuration);
+ kafkaConsumer.subscribe(configuration.getStatusTopicName());
+ StatusConsumer statusConsumer = new StatusConsumer(kafkaConsumer, statusCallback);
executorPool.scheduleAtFixedRate(statusConsumer, 0, configuration.getPollingInterval(), TimeUnit.SECONDS);
- } catch (MalformedURLException | GeneralSecurityException e) {
- handleCambriaInitFailure(errorWrapper, e);
+ } catch (KafkaException | IllegalArgumentException e) {
+ handleMessagingClientInitFailure(errorWrapper, e);
}
}
}
@Override
- /* see javadoc */
public synchronized IDistributionClientResult stop() {
log.info("stop DistributionClient");
@@ -226,30 +267,13 @@ public class DistributionClientImpl implements IDistributionClient {
if (!errorWrapper.isEmpty()) {
return errorWrapper.getInnerElement();
}
- // 1. stop polling notification topic
shutdownExecutor();
- // 2. send to ASDC unregister to topic
- IDistributionClientResult unregisterResult = asdcConnector.unregisterTopics(credential);
- if (unregisterResult.getDistributionActionResult() != DistributionActionResultEnum.SUCCESS) {
- log.info("client failed to unregister from topics");
- } else {
- log.info("client unregistered from topics successfully");
- }
- asdcConnector.close();
-
- try {
- cambriaIdentityManager.deleteCurrentApiKey();
- } catch (HttpException | IOException e) {
- log.debug("failed to delete cambria keys", e);
- }
- cambriaIdentityManager.close();
-
+ sdcConnector.close();
isInitialized = false;
isTerminated = true;
- DistributionClientResultImpl stopResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "distribution client stopped successfuly");
- return stopResult;
+ return new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "distribution client stopped successfully");
}
@Override
@@ -259,167 +283,59 @@ public class DistributionClientImpl implements IDistributionClient {
validateRunReady(errorWrapper);
if (!errorWrapper.isEmpty()) {
IDistributionClientResult result = errorWrapper.getInnerElement();
- IDistributionClientDownloadResult downloadResult = new DistributionClientDownloadResultImpl(result.getDistributionActionResult(), result.getDistributionMessageResult());
- return downloadResult;
+ return new DistributionClientDownloadResultImpl(result.getDistributionActionResult(), result.getDistributionMessageResult());
}
- return asdcConnector.downloadArtifact(artifactInfo);
+ return sdcConnector.downloadArtifact(artifactInfo);
}
- @Override
- public synchronized IDistributionClientResult init(IConfiguration conf, INotificationCallback notificationCallback,
- IStatusCallback statusCallback) {
- IDistributionClientResult initResult;
- if (!conf.isConsumeProduceStatusTopic()) {
- initResult = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG, "configuration is invalid: isConsumeProduceStatusTopic() should be set to 'true'");
-
- } else if (isNull(statusCallback)) {
- initResult = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG, "configuration is invalid: statusCallback is not defined");
- } else {
- this.statusCallback = statusCallback;
- initResult = init(conf, notificationCallback);
- }
- return initResult;
+ SdcConnectorClient createSdcConnector(Configuration configuration) {
+ return new SdcConnectorClient(configuration, new HttpSdcClient(configuration.getSdcAddress(),
+ new HttpClientFactory(configuration),
+ new HttpRequestFactory(configuration.getUser(), configuration.getPassword())));
}
- @Override
- /*
- * see javadoc
- */
- public synchronized IDistributionClientResult init(IConfiguration conf, INotificationCallback callback) {
-
- log.info("DistributionClient - init");
-
- Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
- validateNotInitilized(errorWrapper);
- if (errorWrapper.isEmpty()) {
- validateNotTerminated(errorWrapper);
- }
- if (errorWrapper.isEmpty()) {
- this.configuration = validateAndInitConfiguration(errorWrapper, conf).getSecond();
- this.asdcConnector = createAsdcConnector(this.configuration);
- }
- // 1. get ueb server list from configuration
- if (errorWrapper.isEmpty()) {
- List<String> servers = initUebServerList(errorWrapper);
- if (servers != null) {
- this.brokerServers = servers;
- }
- }
- // 2.validate artifact types against asdc server
- if (errorWrapper.isEmpty()) {
- validateArtifactTypesWithAsdcServer(conf, errorWrapper);
- }
- // 3. create keys
- if (errorWrapper.isEmpty()) {
- this.callback = callback;
- ApiCredential apiCredential = createUebKeys(errorWrapper);
- if (apiCredential != null) {
- this.credential = apiCredential;
- }
- }
- // 4. register for topics
- if (errorWrapper.isEmpty()) {
- TopicRegistrationResponse topics = registerForTopics(errorWrapper, this.credential);
- if (topics != null) {
- this.notificationTopic = topics.getDistrNotificationTopicName();
- this.statusTopic = topics.getDistrStatusTopicName();
- this.notificationSender = createNotificationSender();
- }
- }
-
- IDistributionClientResult result;
- if (errorWrapper.isEmpty()) {
- isInitialized = true;
- result = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "distribution client initialized successfuly");
- } else {
- result = errorWrapper.getInnerElement();
- }
-
- return result;
- }
-
- SdcConnectorClient createAsdcConnector(Configuration configuration) {
- return new SdcConnectorClient(configuration, new HttpAsdcClient(configuration));
- }
-
- private NotificationSender createNotificationSender() {
- return new NotificationSender(brokerServers);
- }
-
- private TopicRegistrationResponse registerForTopics(Wrapper<IDistributionClientResult> errorWrapper, ApiCredential credential) {
- Either<TopicRegistrationResponse, DistributionClientResultImpl> registerAsdcTopics = asdcConnector.registerAsdcTopics(credential);
- if (registerAsdcTopics.isRight()) {
-
- try {
- cambriaIdentityManager.deleteCurrentApiKey();
- } catch (HttpException | IOException e) {
- log.debug("failed to delete cambria keys", e);
- }
- errorWrapper.setInnerElement(registerAsdcTopics.right().value());
- } else {
- return registerAsdcTopics.left().value();
- }
- return null;
- }
-
- private ApiCredential createUebKeys(Wrapper<IDistributionClientResult> errorWrapper) {
- ApiCredential apiCredential = null;
-
- initCambriaClient(errorWrapper);
- if (errorWrapper.isEmpty()) {
- log.debug("create keys");
- Pair<DistributionClientResultImpl, ApiCredential> uebKeys = createUebKeys();
- DistributionClientResultImpl createKeysResponse = uebKeys.getFirst();
- apiCredential = uebKeys.getSecond();
- if (createKeysResponse.getDistributionActionResult() != DistributionActionResultEnum.SUCCESS) {
- errorWrapper.setInnerElement(createKeysResponse);
- }
- }
- return apiCredential;
- }
-
- private void validateArtifactTypesWithAsdcServer(IConfiguration conf, Wrapper<IDistributionClientResult> errorWrapper) {
- Either<List<String>, IDistributionClientResult> eitherValidArtifactTypesList = asdcConnector.getValidArtifactTypesList();
+ private void validateArtifactTypesWithSdcServer(IConfiguration conf, Wrapper<IDistributionClientResult> errorWrapper) {
+ Either<List<String>, IDistributionClientResult> eitherValidArtifactTypesList = sdcConnector.getValidArtifactTypesList();
if (eitherValidArtifactTypesList.isRight()) {
DistributionActionResultEnum errorType = eitherValidArtifactTypesList.right().value().getDistributionActionResult();
- // Support the case of a new client and older ASDC Server which does not have the API
- if (errorType != DistributionActionResultEnum.ASDC_NOT_FOUND) {
+ // Support the case of a new client and older SDC Server which does not have the API
+ if (errorType != DistributionActionResultEnum.SDC_NOT_FOUND) {
errorWrapper.setInnerElement(eitherValidArtifactTypesList.right().value());
}
} else {
- final List<String> artifactTypesFromAsdc = eitherValidArtifactTypesList.left().value();
- boolean isArtifactTypesValid = artifactTypesFromAsdc.containsAll(conf.getRelevantArtifactTypes());
+ final List<String> artifactTypesFromSdc = eitherValidArtifactTypesList.left().value();
+ boolean isArtifactTypesValid = artifactTypesFromSdc.containsAll(conf.getRelevantArtifactTypes());
if (!isArtifactTypesValid) {
- List<String> invalidArtifactTypes = new ArrayList<>();
- invalidArtifactTypes.addAll(conf.getRelevantArtifactTypes());
- invalidArtifactTypes.removeAll(artifactTypesFromAsdc);
+ List<String> invalidArtifactTypes = new ArrayList<>(conf.getRelevantArtifactTypes());
+ invalidArtifactTypes.removeAll(artifactTypesFromSdc);
DistributionClientResultImpl errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_CONTAINS_INVALID_ARTIFACT_TYPES,
- "configuration contains invalid artifact types:" + invalidArtifactTypes + " valid types are:" + artifactTypesFromAsdc);
+ "configuration contains invalid artifact types:" + invalidArtifactTypes + " valid types are:" + artifactTypesFromSdc);
errorWrapper.setInnerElement(errorResponse);
} else {
- log.debug("Artifact types: {} were validated with ASDC server", conf.getRelevantArtifactTypes());
+ log.debug("Artifact types: {} were validated with SDC server", conf.getRelevantArtifactTypes());
}
}
}
- private List<String> initUebServerList(Wrapper<IDistributionClientResult> errorWrapper) {
- List<String> brokerServers = null;
- log.debug("get ueb cluster server list from component(configuration file)");
-
- Either<List<String>, IDistributionClientResult> serverListResponse = getUEBServerList();
- if (serverListResponse.isRight()) {
- errorWrapper.setInnerElement(serverListResponse.right().value());
+ private void initKafkaData(Wrapper<IDistributionClientResult> errorWrapper) {
+ log.debug("Get MessageBus cluster information from SDC");
+ Either<KafkaDataResponse, IDistributionClientResult> kafkaData = sdcConnector.getKafkaDistData();
+ if (kafkaData.isRight()) {
+ errorWrapper.setInnerElement(kafkaData.right().value());
} else {
- brokerServers = serverListResponse.left().value();
+ KafkaDataResponse kafkaDataResponse = kafkaData.left().value();
+ configuration.setMsgBusAddress(Collections.singletonList(kafkaDataResponse.getKafkaBootStrapServer()));
+ configuration.setNotificationTopicName(kafkaDataResponse.getDistrNotificationTopicName());
+ configuration.setStatusTopicName(kafkaDataResponse.getDistrStatusTopicName());
+ log.debug("MessageBus cluster info retrieved successfully {}", kafkaData.left().value());
}
-
- return brokerServers;
}
private void validateNotInitilized(Wrapper<IDistributionClientResult> errorWrapper) {
if (isInitialized) {
log.warn("distribution client already initialized");
- DistributionClientResultImpl alreadyInitResponse = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_ALREADY_INITIALIZED, "distribution client already initialized");
+ DistributionClientResultImpl alreadyInitResponse = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_ALREADY_INITIALIZED,
+ "distribution client already initialized");
errorWrapper.setInnerElement(alreadyInitResponse);
}
}
@@ -431,31 +347,17 @@ public class DistributionClientImpl implements IDistributionClient {
}
private IDistributionClientResult sendStatus(IDistributionStatusMessageJsonBuilder statusBuilder) {
- IDistributionClientResult distributionResult;
-
- Either<CambriaBatchingPublisher, IDistributionClientResult> cambriaPublisher = getCambriaPublisher(statusTopic, configuration, brokerServers, credential);
- if (cambriaPublisher.isRight()) {
- distributionResult = cambriaPublisher.right().value();
- } else {
- String statusMessage = statusBuilder.build();
- distributionResult = notificationSender.send(cambriaPublisher.left().value(), statusMessage);
- }
-
- return distributionResult;
+ return notificationSender.send(configuration.getStatusTopicName(), statusBuilder.build());
}
- private Either<CambriaBatchingPublisher, IDistributionClientResult> getCambriaPublisher(String statusTopic, Configuration configuration, List<String> brokerServers, ApiCredential credential) {
- CambriaBatchingPublisher cambriaPublisher = null;
+ private void initKafkaProducer(Wrapper<IDistributionClientResult> errorWrapper, Configuration configuration) {
try {
- cambriaPublisher = new PublisherBuilder().onTopic(statusTopic).usingHttps(configuration.isUseHttpsWithDmaap())
- .usingHosts(brokerServers).build();
- cambriaPublisher.setApiCredentials(credential.getApiKey(), credential.getApiSecret());
- } catch (MalformedURLException | GeneralSecurityException e) {
- Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
- handleCambriaInitFailure(errorWrapper, e);
- return Either.right(errorWrapper.getInnerElement());
+ if (producer == null) {
+ producer = new SdcKafkaProducer(configuration);
+ }
+ } catch (KafkaException | IllegalStateException e) {
+ handleMessagingClientInitFailure(errorWrapper, e);
}
- return Either.left(cambriaPublisher);
}
@Override
@@ -471,27 +373,17 @@ public class DistributionClientImpl implements IDistributionClient {
if (!errorWrapper.isEmpty()) {
return errorWrapper.getInnerElement();
}
- IDistributionStatusMessageJsonBuilder builder = DistributionStatusMessageJsonBuilderFactory.prepareBuilderForNotificationStatus(getConfiguration().getConsumerID(), currentTimeMillis, distributionId, artifactInfo, isNotified);
+ IDistributionStatusMessageJsonBuilder builder = DistributionStatusMessageJsonBuilderFactory.prepareBuilderForNotificationStatus(
+ getConfiguration().getConsumerID(),
+ currentTimeMillis,
+ distributionId,
+ artifactInfo,
+ isNotified);
return sendStatus(builder);
}
/* *************************** Private Methods *************************************************** */
- protected Pair<DistributionClientResultImpl, ApiCredential> createUebKeys() {
- DistributionClientResultImpl response = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "keys created successfuly");
- ApiCredential credential = null;
- try {
- String description = String.format(DistributionClientConstants.CLIENT_DESCRIPTION, configuration.getConsumerID());
- credential = cambriaIdentityManager.createApiKey(DistributionClientConstants.EMAIL, description);
- cambriaIdentityManager.setApiCredentials(credential.getApiKey(), credential.getApiSecret());
-
- } catch (HttpException | CambriaApiException | IOException e) {
- response = new DistributionClientResultImpl(DistributionActionResultEnum.UEB_KEYS_CREATION_FAILED, "failed to create keys: " + e.getMessage());
- log.error(response.toString());
- }
- return new Pair<>(response, credential);
- }
-
private IDistributionClientResult restartConsumer() {
shutdownExecutor();
return start();
@@ -500,43 +392,36 @@ public class DistributionClientImpl implements IDistributionClient {
protected Pair<DistributionActionResultEnum, Configuration> validateAndInitConfiguration(Wrapper<IDistributionClientResult> errorWrapper, IConfiguration conf) {
DistributionActionResultEnum result = configurationValidator.validateConfiguration(conf, statusCallback);
- Configuration configuration = null;
+ Configuration configurationInit = null;
if (result == DistributionActionResultEnum.SUCCESS) {
- configuration = createConfiguration(conf);
+ configurationInit = createConfiguration(conf);
} else {
DistributionClientResultImpl initResult = new DistributionClientResultImpl(result, "configuration is invalid: " + result.name());
log.error(initResult.toString());
errorWrapper.setInnerElement(initResult);
}
- return new Pair<>(result, configuration);
+ return new Pair<>(result, configurationInit);
}
private Configuration createConfiguration(IConfiguration conf) {
- Configuration configuration = new Configuration(conf);
+ Configuration configurationCreate = new Configuration(conf);
if (!isPollingIntervalValid(conf.getPollingInterval())) {
- configuration.setPollingInterval(DistributionClientConstants.MIN_POLLING_INTERVAL_SEC);
+ configurationCreate.setPollingInterval(DistributionClientConstants.MIN_POLLING_INTERVAL_SEC);
}
if (!isPollingTimeoutValid(conf.getPollingTimeout())) {
- configuration.setPollingTimeout(DistributionClientConstants.POLLING_TIMEOUT_SEC);
+ configurationCreate.setPollingTimeout(DistributionClientConstants.POLLING_TIMEOUT_SEC);
}
if (conf.getConsumerGroup() == null) {
String generatedConsumerGroup = UUID.randomUUID().toString();
- configuration.setConsumerGroup(generatedConsumerGroup);
+ configurationCreate.setConsumerGroup(generatedConsumerGroup);
isConsumerGroupGenerated = true;
}
-
//Default use HTTPS with SDC
if (conf.isUseHttpsWithSDC() == null) {
- configuration.setUseHttpsWithSDC(true);
+ configurationCreate.setUseHttpsWithSDC(true);
}
-
- //Default use HTTPS with DMAAP
- if (conf.isUseHttpsWithDmaap() == null) {
- configuration.setUseHttpsWithDmaap(true);
- }
-
- return configuration;
+ return configurationCreate;
}
private void shutdownExecutor() {
@@ -577,7 +462,8 @@ public class DistributionClientImpl implements IDistributionClient {
private void validateInitialized(Wrapper<IDistributionClientResult> errorWrapper) {
if (!isInitialized) {
log.debug("client was not initialized");
- IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_NOT_INITIALIZED, "distribution client was not initialized");
+ IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_NOT_INITIALIZED,
+ "distribution client was not initialized");
errorWrapper.setInnerElement(result);
}
}
@@ -585,7 +471,8 @@ public class DistributionClientImpl implements IDistributionClient {
private void validateNotStarted(Wrapper<IDistributionClientResult> errorWrapper) {
if (isStarted) {
log.debug("client already started");
- IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_ALREADY_STARTED, "distribution client already started");
+ IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_ALREADY_STARTED,
+ "distribution client already started");
errorWrapper.setInnerElement(result);
}
}
@@ -593,7 +480,8 @@ public class DistributionClientImpl implements IDistributionClient {
private void validateNotTerminated(Wrapper<IDistributionClientResult> errorWrapper) {
if (isTerminated) {
log.debug("client was terminated");
- IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_IS_TERMINATED, "distribution client was terminated");
+ IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_IS_TERMINATED,
+ "distribution client was terminated");
errorWrapper.setInnerElement(result);
}
}
@@ -616,23 +504,10 @@ public class DistributionClientImpl implements IDistributionClient {
return isValid;
}
- private synchronized void initCambriaClient(Wrapper<IDistributionClientResult> errorWrapper) {
- if (cambriaIdentityManager == null) {
- try {
- AbstractAuthenticatedManagerBuilder<CambriaIdentityManager> managerBuilder = new IdentityManagerBuilder().usingHosts(brokerServers);
- if (configuration.isUseHttpsWithDmaap()) {
- managerBuilder = managerBuilder.usingHttps();
- }
- cambriaIdentityManager = managerBuilder.build();
- } catch (MalformedURLException | GeneralSecurityException e) {
- handleCambriaInitFailure(errorWrapper, e);
- }
- }
- }
- private void handleCambriaInitFailure(Wrapper<IDistributionClientResult> errorWrapper, Exception e) {
- final String errorMessage = "Failed initilizing cambria component:" + e.getMessage();
- IDistributionClientResult errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.CAMBRIA_INIT_FAILED, errorMessage);
+ private void handleMessagingClientInitFailure(Wrapper<IDistributionClientResult> errorWrapper, Exception e) {
+ final String errorMessage = "Failed initializing messaging component:" + e.getMessage();
+ IDistributionClientResult errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.MESSAGING_CLIENT_INIT_FAILED, errorMessage);
errorWrapper.setInnerElement(errorResponse);
log.error(errorMessage);
log.debug(errorMessage, e);
@@ -682,8 +557,7 @@ public class DistributionClientImpl implements IDistributionClient {
String vfModuleJsonString = new String(artifactPayload, StandardCharsets.UTF_8);
final Type type = new TypeToken<List<VfModuleMetadata>>() {
}.getType();
- List<IVfModuleMetadata> vfModules = gson.fromJson(vfModuleJsonString, type);
- return vfModules;
+ return gson.fromJson(vfModuleJsonString, type);
}
@@ -702,27 +576,12 @@ public class DistributionClientImpl implements IDistributionClient {
}
-
- public Either<List<String>, IDistributionClientResult> getUEBServerList() {
- List<String> msgBusAddresses = configuration.getMsgBusAddress();
- if (msgBusAddresses.isEmpty()) {
- return Either.right(new DistributionClientResultImpl(DistributionActionResultEnum.CONF_MISSING_MSG_BUS_ADDRESS, "Message bus address was not found in the config file"));
- } else if (getHttpProxyHost() == null && getHttpsProxyHost() == null) {
- // If there is no proxy configured, convert to valid host name
- return GeneralUtils.convertToValidHostName(msgBusAddresses);
- } else {
- // skip the IP address lookup when proxy is configured and treat all
- // hosts as valid
- return Either.left(msgBusAddresses);
- }
- }
private HttpHost getHttpProxyHost() {
HttpHost proxyHost = null;
- if (configuration.isUseSystemProxy() && System.getProperty("http.proxyHost") != null
- && System.getProperty("http.proxyPort") != null) {
+ if (Boolean.TRUE.equals(configuration.isUseSystemProxy() && System.getProperty("http.proxyHost") != null) && System.getProperty("http.proxyPort") != null) {
proxyHost = new HttpHost(System.getProperty("http.proxyHost"),
- Integer.valueOf(System.getProperty("http.proxyPort")));
+ Integer.parseInt(System.getProperty("http.proxyPort")));
} else if (configuration.getHttpProxyHost() != null && configuration.getHttpProxyPort() != 0) {
proxyHost = new HttpHost(configuration.getHttpProxyHost(), configuration.getHttpProxyPort());
}
@@ -731,10 +590,9 @@ public class DistributionClientImpl implements IDistributionClient {
private HttpHost getHttpsProxyHost() {
HttpHost proxyHost = null;
- if (configuration.isUseSystemProxy() && System.getProperty("https.proxyHost") != null
- && System.getProperty("https.proxyPort") != null) {
+ if (configuration.isUseSystemProxy() && System.getProperty("https.proxyHost") != null && System.getProperty("https.proxyPort") != null) {
proxyHost = new HttpHost(System.getProperty("https.proxyHost"),
- Integer.valueOf(System.getProperty("https.proxyPort")));
+ Integer.parseInt(System.getProperty("https.proxyPort")));
} else if (configuration.getHttpsProxyHost() != null && configuration.getHttpsProxyPort() != 0) {
proxyHost = new HttpHost(configuration.getHttpsProxyHost(), configuration.getHttpsProxyPort());
}
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientResultImpl.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientResultImpl.java
index 4edd355..77655ac 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientResultImpl.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientResultImpl.java
@@ -25,8 +25,8 @@ import org.onap.sdc.utils.DistributionActionResultEnum;
public class DistributionClientResultImpl implements IDistributionClientResult {
- private DistributionActionResultEnum responseStatus;
- private String responseMessage;
+ private final DistributionActionResultEnum responseStatus;
+ private final String responseMessage;
public DistributionClientResultImpl(DistributionActionResultEnum responseStatus, String responseMessage) {
this.responseStatus = responseStatus;
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java
index bf28d97..c59612a 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java
@@ -20,34 +20,32 @@
package org.onap.sdc.impl;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import java.util.ArrayList;
import java.util.List;
-
+import org.onap.sdc.api.consumer.INotificationCallback;
import org.onap.sdc.api.notification.IArtifactInfo;
+import org.onap.sdc.api.notification.INotificationData;
import org.onap.sdc.api.notification.IResourceInstance;
import org.onap.sdc.api.results.IDistributionClientResult;
-import org.onap.sdc.utils.DistributionActionResultEnum;
-import org.onap.sdc.api.consumer.INotificationCallback;
-import org.onap.sdc.api.notification.INotificationData;
import org.onap.sdc.utils.ArtifactTypeEnum;
+import org.onap.sdc.utils.DistributionActionResultEnum;
+import org.onap.sdc.utils.kafka.SdcKafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.att.nsa.cambria.client.CambriaConsumer;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-
class NotificationConsumer implements Runnable {
- private static Logger log = LoggerFactory.getLogger(NotificationConsumer.class.getName());
+ private static final Logger log = LoggerFactory.getLogger(NotificationConsumer.class.getName());
- private CambriaConsumer cambriaConsumer;
- private INotificationCallback clientCallback;
- private List<String> artifactsTypes;
- private DistributionClientImpl distributionClient;
+ private final SdcKafkaConsumer kafkaConsumer;
+ private final INotificationCallback clientCallback;
+ private final List<String> artifactsTypes;
+ private final DistributionClientImpl distributionClient;
- NotificationConsumer(CambriaConsumer cambriaConsumer, INotificationCallback clientCallback, List<String> artifactsTypes, DistributionClientImpl distributionClient) {
- this.cambriaConsumer = cambriaConsumer;
+ NotificationConsumer(SdcKafkaConsumer kafkaConsumer, INotificationCallback clientCallback, List<String> artifactsTypes, DistributionClientImpl distributionClient) {
+ this.kafkaConsumer = kafkaConsumer;
this.clientCallback = clientCallback;
this.artifactsTypes = artifactsTypes;
this.distributionClient = distributionClient;
@@ -55,16 +53,16 @@ class NotificationConsumer implements Runnable {
@Override
public void run() {
-
try {
Gson gson = new GsonBuilder().setPrettyPrinting().create();
long currentTimeMillis = System.currentTimeMillis();
- for (String notificationMsg : cambriaConsumer.fetch()) {
+ log.info("Polling for messages from topic: {}", kafkaConsumer.getTopicName());
+ for (String notificationMsg : kafkaConsumer.poll()) {
log.debug("received message from topic");
- log.debug("recieved notification from broker: {}", notificationMsg);
+ log.debug("received notification from broker: {}", notificationMsg);
- final NotificationDataImpl notificationFromUEB = gson.fromJson(notificationMsg, NotificationDataImpl.class);
- NotificationDataImpl notificationForCallback = buildCallbackNotificationLogic(currentTimeMillis, notificationFromUEB);
+ final NotificationDataImpl notificationFromMessageBus = gson.fromJson(notificationMsg, NotificationDataImpl.class);
+ NotificationDataImpl notificationForCallback = buildCallbackNotificationLogic(currentTimeMillis, notificationFromMessageBus);
if (isActivateCallback(notificationForCallback)) {
String stringNotificationForCallback = gson.toJson(notificationForCallback);
log.debug("sending notification to client: {}", stringNotificationForCallback);
@@ -73,8 +71,8 @@ class NotificationConsumer implements Runnable {
}
} catch (Exception e) {
- log.error("Error exception occured when fetching with Cambria Client:{}", e.getMessage());
- log.debug("Error exception occured when fetching with Cambria Client:{}", e.getMessage(), e);
+ log.error("Error exception occurred when fetching with Kafka Consumer:{}", e.getMessage());
+ log.debug("Error exception occurred when fetching with Kafka Consumer:{}", e.getMessage(), e);
}
}
@@ -85,21 +83,21 @@ class NotificationConsumer implements Runnable {
return hasRelevantArtifactsInResourceInstance || hasRelevantArtifactsInService;
}
- protected NotificationDataImpl buildCallbackNotificationLogic(long currentTimeMillis, final NotificationDataImpl notificationFromUEB) {
- List<IResourceInstance> relevantResourceInstances = buildResourceInstancesLogic(notificationFromUEB, currentTimeMillis);
- List<ArtifactInfoImpl> relevantServiceArtifacts = handleRelevantArtifacts(notificationFromUEB, currentTimeMillis, notificationFromUEB.getServiceArtifactsImpl());
- notificationFromUEB.setResources(relevantResourceInstances);
- notificationFromUEB.setServiceArtifacts(relevantServiceArtifacts);
- return notificationFromUEB;
+ protected NotificationDataImpl buildCallbackNotificationLogic(long currentTimeMillis, final NotificationDataImpl notificationFromMessageBus) {
+ List<IResourceInstance> relevantResourceInstances = buildResourceInstancesLogic(notificationFromMessageBus, currentTimeMillis);
+ List<ArtifactInfoImpl> relevantServiceArtifacts = handleRelevantArtifacts(notificationFromMessageBus, currentTimeMillis, notificationFromMessageBus.getServiceArtifactsImpl());
+ notificationFromMessageBus.setResources(relevantResourceInstances);
+ notificationFromMessageBus.setServiceArtifacts(relevantServiceArtifacts);
+ return notificationFromMessageBus;
}
- private List<IResourceInstance> buildResourceInstancesLogic(NotificationDataImpl notificationFromUEB, long currentTimeMillis) {
+ private List<IResourceInstance> buildResourceInstancesLogic(NotificationDataImpl notificationFromMessageBus, long currentTimeMillis) {
List<IResourceInstance> relevantResourceInstances = new ArrayList<>();
- for (JsonContainerResourceInstance resourceInstance : notificationFromUEB.getResourcesImpl()) {
+ for (JsonContainerResourceInstance resourceInstance : notificationFromMessageBus.getResourcesImpl()) {
final List<ArtifactInfoImpl> artifactsImplList = resourceInstance.getArtifactsImpl();
- List<ArtifactInfoImpl> foundRelevantArtifacts = handleRelevantArtifacts(notificationFromUEB, currentTimeMillis, artifactsImplList);
+ List<ArtifactInfoImpl> foundRelevantArtifacts = handleRelevantArtifacts(notificationFromMessageBus, currentTimeMillis, artifactsImplList);
if (!foundRelevantArtifacts.isEmpty() || distributionClient.getConfiguration().isFilterInEmptyResources()) {
resourceInstance.setArtifacts(foundRelevantArtifacts);
relevantResourceInstances.add(resourceInstance);
@@ -109,17 +107,17 @@ class NotificationConsumer implements Runnable {
}
- private List<ArtifactInfoImpl> handleRelevantArtifacts(NotificationDataImpl notificationFromUEB, long currentTimeMillis, final List<ArtifactInfoImpl> artifactsImplList) {
+ private List<ArtifactInfoImpl> handleRelevantArtifacts(NotificationDataImpl notificationFromMessageBus, long currentTimeMillis, final List<ArtifactInfoImpl> artifactsImplList) {
List<ArtifactInfoImpl> relevantArtifacts = new ArrayList<>();
if (artifactsImplList != null) {
for (ArtifactInfoImpl artifactInfo : artifactsImplList) {
- handleRelevantArtifact(notificationFromUEB, currentTimeMillis, artifactsImplList, relevantArtifacts, artifactInfo);
+ handleRelevantArtifact(notificationFromMessageBus, currentTimeMillis, artifactsImplList, relevantArtifacts, artifactInfo);
}
}
return relevantArtifacts;
}
- private void handleRelevantArtifact(NotificationDataImpl notificationFromUEB, long currentTimeMillis, final List<ArtifactInfoImpl> artifactsImplList, List<ArtifactInfoImpl> relevantArtifacts, ArtifactInfoImpl artifactInfo) {
+ private void handleRelevantArtifact(NotificationDataImpl notificationFromMessageBus, long currentTimeMillis, final List<ArtifactInfoImpl> artifactsImplList, List<ArtifactInfoImpl> relevantArtifacts, ArtifactInfoImpl artifactInfo) {
boolean isArtifactRelevant = artifactsTypes.contains(artifactInfo.getArtifactType());
String artifactType = artifactInfo.getArtifactType();
if (artifactInfo.getGeneratedFromUUID() != null && !artifactInfo.getGeneratedFromUUID().isEmpty()) {
@@ -131,16 +129,16 @@ class NotificationConsumer implements Runnable {
}
}
if (isArtifactRelevant) {
- setRelatedArtifacts(artifactInfo, notificationFromUEB);
+ setRelatedArtifacts(artifactInfo, notificationFromMessageBus);
if (artifactType.equals(ArtifactTypeEnum.HEAT.name()) || artifactType.equals(ArtifactTypeEnum.HEAT_VOL.name()) || artifactType.equals(ArtifactTypeEnum.HEAT_NET.name())) {
setGeneratedArtifact(artifactsImplList, artifactInfo);
}
relevantArtifacts.add(artifactInfo);
}
- IDistributionClientResult notificationStatus = distributionClient.sendNotificationStatus(currentTimeMillis, notificationFromUEB.getDistributionID(), artifactInfo, isArtifactRelevant);
+ IDistributionClientResult notificationStatus = distributionClient.sendNotificationStatus(currentTimeMillis, notificationFromMessageBus.getDistributionID(), artifactInfo, isArtifactRelevant);
if (notificationStatus.getDistributionActionResult() != DistributionActionResultEnum.SUCCESS) {
- log.error("Error failed to send notification status to UEB failed status:{}, error message:{}", notificationStatus.getDistributionActionResult().name(), notificationStatus.getDistributionMessageResult());
+ log.error("Error failed to send notification status to MessageBus failed status:{}, error message:{}", notificationStatus.getDistributionActionResult().name(), notificationStatus.getDistributionMessageResult());
}
}
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/StatusConsumer.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/StatusConsumer.java
index 5951ed0..2c69330 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/StatusConsumer.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/StatusConsumer.java
@@ -20,24 +20,23 @@
package org.onap.sdc.impl;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import org.onap.sdc.api.consumer.IStatusCallback;
import org.onap.sdc.api.notification.IStatusData;
+import org.onap.sdc.utils.kafka.SdcKafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.att.nsa.cambria.client.CambriaConsumer;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-
class StatusConsumer implements Runnable {
- private static Logger log = LoggerFactory.getLogger(StatusConsumer.class.getName());
+ private static final Logger log = LoggerFactory.getLogger(StatusConsumer.class.getName());
- private CambriaConsumer cambriaConsumer;
- private IStatusCallback clientCallback;
+ private final SdcKafkaConsumer kafkaConsumer;
+ private final IStatusCallback clientCallback;
- StatusConsumer(CambriaConsumer cambriaConsumer, IStatusCallback clientCallback) {
- this.cambriaConsumer = cambriaConsumer;
+ StatusConsumer(SdcKafkaConsumer kafkaConsumer, IStatusCallback clientCallback) {
+ this.kafkaConsumer = kafkaConsumer;
this.clientCallback = clientCallback;
}
@@ -46,18 +45,16 @@ class StatusConsumer implements Runnable {
try {
Gson gson = new GsonBuilder().setPrettyPrinting().create();
- for (String statusMsg : cambriaConsumer.fetch()) {
+ log.info("Polling for messages from topic: {}", kafkaConsumer.getTopicName());
+ for (String statusMsg : kafkaConsumer.poll()) {
log.debug("received message from topic");
- log.debug("recieved notification from broker: {}", statusMsg);
+ log.debug("received notification from broker: {}", statusMsg);
IStatusData statusData = gson.fromJson(statusMsg, StatusDataImpl.class);
clientCallback.activateCallback(statusData);
-
-
}
-
} catch (Exception e) {
- log.error("Error exception occured when fetching with Cambria Client:{}", e.getMessage());
- log.debug("Error exception occured when fetching with Cambria Client:{}", e.getMessage(), e);
+ log.error("Error exception occurred when fetching with Kafka Consumer:{}", e.getMessage());
+ log.debug("Error exception occurred when fetching with Kafka Consumer:{}", e.getMessage(), e);
}
}