diff options
Diffstat (limited to 'sdc-distribution-client/src/main/java/org/onap/sdc/impl')
6 files changed, 288 insertions, 413 deletions
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java index 2da9c4e..db4433b 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java @@ -28,7 +28,12 @@ import org.onap.sdc.utils.DistributionClientConstants; public class Configuration implements IConfiguration { private List<String> msgBusAddressList; - private String asdcAddress; + private final String kafkaSecurityProtocolConfig; + private final String kafkaSaslMechanism; + private final String kafkaSaslJaasConfig; + private String sdcStatusTopicName; + private String sdcNotificationTopicName; + private String sdcAddress; private String user; private String password; private int pollingInterval = DistributionClientConstants.MIN_POLLING_INTERVAL_SEC; @@ -40,10 +45,9 @@ public class Configuration implements IConfiguration { private String keyStorePath; private String keyStorePassword; private boolean activateServerTLSAuth; - private boolean filterInEmptyResources; - private Boolean useHttpsWithDmaap; + private final boolean filterInEmptyResources; private Boolean useHttpsWithSDC; - private boolean consumeProduceStatusTopic; + private final boolean consumeProduceStatusTopic; private String httpProxyHost; private int httpProxyPort; private String httpsProxyHost; @@ -51,23 +55,24 @@ public class Configuration implements IConfiguration { private boolean useSystemProxy; public Configuration(IConfiguration other) { - this.asdcAddress = other.getAsdcAddress(); - this.msgBusAddressList = other.getMsgBusAddress(); + this.kafkaSecurityProtocolConfig = other.getKafkaSecurityProtocolConfig(); + this.kafkaSaslMechanism = other.getKafkaSaslMechanism(); + this.kafkaSaslJaasConfig = other.getKafkaSaslJaasConfig(); this.comsumerID = other.getConsumerID(); this.consumerGroup = other.getConsumerGroup(); - this.environmentName = other.getEnvironmentName(); - this.password = other.getPassword(); this.pollingInterval = other.getPollingInterval(); this.pollingTimeout = other.getPollingTimeout(); - this.relevantArtifactTypes = other.getRelevantArtifactTypes(); + this.environmentName = other.getEnvironmentName(); + this.consumeProduceStatusTopic = other.isConsumeProduceStatusTopic(); + this.sdcAddress = other.getSdcAddress(); this.user = other.getUser(); + this.password = other.getPassword(); + this.relevantArtifactTypes = other.getRelevantArtifactTypes(); this.useHttpsWithSDC = other.isUseHttpsWithSDC(); this.keyStorePath = other.getKeyStorePath(); this.keyStorePassword = other.getKeyStorePassword(); this.activateServerTLSAuth = other.activateServerTLSAuth(); this.filterInEmptyResources = other.isFilterInEmptyResources(); - this.useHttpsWithDmaap = other.isUseHttpsWithDmaap(); - this.consumeProduceStatusTopic = other.isConsumeProduceStatusTopic(); this.httpProxyHost = other.getHttpProxyHost(); this.httpProxyPort = other.getHttpProxyPort(); this.httpsProxyHost = other.getHttpsProxyHost(); @@ -76,8 +81,24 @@ public class Configuration implements IConfiguration { } @Override - public String getAsdcAddress() { - return asdcAddress; + public String getSdcAddress() { + return sdcAddress; + } + + public String getStatusTopicName() { + return sdcStatusTopicName; + } + + public void setStatusTopicName(String sdcStatusTopicName) { + this.sdcStatusTopicName = sdcStatusTopicName; + } + + public String getNotificationTopicName() { + return sdcNotificationTopicName; + } + + public void setNotificationTopicName(String sdcNotificationTopicName) { + this.sdcNotificationTopicName = sdcNotificationTopicName; } @Override @@ -85,6 +106,25 @@ public class Configuration implements IConfiguration { return msgBusAddressList; } + public void setMsgBusAddress(List<String> newMsgBusAddress) { + msgBusAddressList = newMsgBusAddress; + } + + @Override + public String getKafkaSecurityProtocolConfig() { + return kafkaSecurityProtocolConfig; + } + + @Override + public String getKafkaSaslMechanism() { + return kafkaSaslMechanism; + } + + @Override + public String getKafkaSaslJaasConfig() { + return kafkaSaslJaasConfig; + } + @Override public Boolean isUseHttpsWithSDC() { return useHttpsWithSDC; @@ -169,8 +209,8 @@ public class Configuration implements IConfiguration { this.comsumerID = comsumerID; } - public void setAsdcAddress(String asdcAddress) { - this.asdcAddress = asdcAddress; + public void setSdcAddress(String sdcAddress) { + this.sdcAddress = sdcAddress; } public void setUser(String user) { @@ -243,19 +283,10 @@ public class Configuration implements IConfiguration { return this.filterInEmptyResources; } - @Override - public Boolean isUseHttpsWithDmaap() { - return this.useHttpsWithDmaap; - } - public void setUseHttpsWithSDC(boolean useHttpsWithSDC) { this.useHttpsWithSDC = useHttpsWithSDC; } - public void setUseHttpsWithDmaap(boolean useHttpsWithDmaap) { - this.useHttpsWithDmaap = useHttpsWithDmaap; - } - @Override public boolean isConsumeProduceStatusTopic() { return this.consumeProduceStatusTopic; @@ -265,11 +296,13 @@ public class Configuration implements IConfiguration { public String toString() { //@formatter:off return "Configuration [" - + "asdcAddress=" + asdcAddress + + "sdcAddress=" + sdcAddress + ", user=" + user + ", password=" + password + ", useHttpsWithSDC=" + useHttpsWithSDC + ", pollingInterval=" + pollingInterval + + ", sdcStatusTopicName=" + sdcStatusTopicName + + ", sdcNotificationTopicName=" + sdcNotificationTopicName + ", pollingTimeout=" + pollingTimeout + ", relevantArtifactTypes=" + relevantArtifactTypes + ", consumerGroup=" + consumerGroup @@ -279,7 +312,6 @@ public class Configuration implements IConfiguration { + ", keyStorePassword=" + keyStorePassword + ", activateServerTLSAuth=" + activateServerTLSAuth + ", filterInEmptyResources=" + filterInEmptyResources - + ", useHttpsWithDmaap=" + useHttpsWithDmaap + ", consumeProduceStatusTopic=" + consumeProduceStatusTopic + ", useSystemProxy=" + useSystemProxy + ", httpProxyHost=" + httpProxyHost diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/ConfigurationValidator.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/ConfigurationValidator.java index b645ed1..829c6ce 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/ConfigurationValidator.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/ConfigurationValidator.java @@ -19,23 +19,22 @@ */ package org.onap.sdc.impl; -import org.onap.sdc.api.consumer.IConfiguration; -import org.onap.sdc.api.consumer.IStatusCallback; -import org.onap.sdc.utils.DistributionActionResultEnum; -import org.onap.sdc.utils.DistributionClientConstants; - import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.function.Function; import java.util.regex.Matcher; +import org.onap.sdc.api.consumer.IConfiguration; +import org.onap.sdc.api.consumer.IStatusCallback; +import org.onap.sdc.utils.DistributionActionResultEnum; +import org.onap.sdc.utils.DistributionClientConstants; public class ConfigurationValidator { private Map<Function<IConfiguration, Boolean>, DistributionActionResultEnum> cachedValidators; - DistributionActionResultEnum validateConfiguration(IConfiguration conf, IStatusCallback statusCallback) { + public DistributionActionResultEnum validateConfiguration(IConfiguration conf, IStatusCallback statusCallback) { final Map<Function<IConfiguration, Boolean>, DistributionActionResultEnum> validators = getValidators(statusCallback); for (Map.Entry<Function<IConfiguration, Boolean>, DistributionActionResultEnum> validation : validators.entrySet()) { @@ -53,10 +52,8 @@ public class ConfigurationValidator { validators.put(isCustomerIdNotSet(), DistributionActionResultEnum.CONF_MISSING_CONSUMER_ID); validators.put(isUserNotSet(), DistributionActionResultEnum.CONF_MISSING_USERNAME); validators.put(isPasswordNotSet(), DistributionActionResultEnum.CONF_MISSING_PASSWORD); - validators.put(isMsgBusAddressNotSet(), DistributionActionResultEnum.CONF_MISSING_MSG_BUS_ADDRESS); - validators.put(isAsdcAddressNotSet(), DistributionActionResultEnum.CONF_MISSING_ASDC_FQDN); - validators.put(isFqdnValid(), DistributionActionResultEnum.CONF_INVALID_ASDC_FQDN); - validators.put(areFqdnsValid(), DistributionActionResultEnum.CONF_INVALID_MSG_BUS_ADDRESS); + validators.put(isSdcAddressNotSet(), DistributionActionResultEnum.CONF_MISSING_SDC_FQDN); + validators.put(isFqdnValid(), DistributionActionResultEnum.CONF_INVALID_SDC_FQDN); validators.put(isEnvNameNotSet(), DistributionActionResultEnum.CONF_MISSING_ENVIRONMENT_NAME); validators.put(isRelevantArtifactTypesNotSet(), DistributionActionResultEnum.CONF_MISSING_ARTIFACT_TYPES); validators.put(isConsumeStatusTopicWithCallbackNotSet(statusCallback), DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG); @@ -85,17 +82,10 @@ public class ConfigurationValidator { return it -> it.getEnvironmentName() == null || it.getEnvironmentName().isEmpty(); } - private Function<IConfiguration, Boolean> areFqdnsValid() { - return it -> !isValidFqdns(it.getMsgBusAddress()); - } - private Function<IConfiguration, Boolean> isFqdnValid() { - return it -> !isValidFqdn(it.getAsdcAddress()); + return it -> !isValidFqdn(it.getSdcAddress()); } - private Function<IConfiguration, Boolean> isMsgBusAddressNotSet() { - return it -> it.getMsgBusAddress() == null || it.getMsgBusAddress().isEmpty(); - } private Function<IConfiguration, Boolean> isPasswordNotSet() { return it -> it.getPassword() == null || it.getPassword().isEmpty(); @@ -113,8 +103,8 @@ public class ConfigurationValidator { return it -> it.getConsumerID() == null || it.getConsumerID().isEmpty(); } - private Function<IConfiguration, Boolean> isAsdcAddressNotSet() { - return it -> it.getAsdcAddress() == null || it.getAsdcAddress().isEmpty(); + private Function<IConfiguration, Boolean> isSdcAddressNotSet() { + return it -> it.getSdcAddress() == null || it.getSdcAddress().isEmpty(); } static boolean isValidFqdn(String fqdn) { diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java index 3a25abb..a34ba1e 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java @@ -23,11 +23,12 @@ package org.onap.sdc.impl; import static java.util.Objects.isNull; -import java.io.IOException; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.reflect.TypeToken; +import fj.data.Either; import java.lang.reflect.Type; -import java.net.MalformedURLException; import java.nio.charset.StandardCharsets; -import java.security.GeneralSecurityException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -35,8 +36,8 @@ import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; - import org.apache.http.HttpHost; +import org.apache.kafka.common.KafkaException; import org.onap.sdc.api.IDistributionClient; import org.onap.sdc.api.IDistributionStatusMessageJsonBuilder; import org.onap.sdc.api.consumer.IComponentDoneStatusMessage; @@ -46,53 +47,35 @@ import org.onap.sdc.api.consumer.IFinalDistrStatusMessage; import org.onap.sdc.api.consumer.INotificationCallback; import org.onap.sdc.api.consumer.IStatusCallback; import org.onap.sdc.api.notification.IArtifactInfo; +import org.onap.sdc.api.notification.IVfModuleMetadata; import org.onap.sdc.api.results.IDistributionClientDownloadResult; import org.onap.sdc.api.results.IDistributionClientResult; -import org.onap.sdc.http.HttpAsdcClient; +import org.onap.sdc.http.HttpClientFactory; +import org.onap.sdc.http.HttpRequestFactory; +import org.onap.sdc.http.HttpSdcClient; import org.onap.sdc.http.SdcConnectorClient; -import org.onap.sdc.http.TopicRegistrationResponse; import org.onap.sdc.utils.DistributionActionResultEnum; import org.onap.sdc.utils.DistributionClientConstants; -import org.onap.sdc.utils.GeneralUtils; import org.onap.sdc.utils.NotificationSender; import org.onap.sdc.utils.Pair; import org.onap.sdc.utils.Wrapper; -import org.onap.sdc.api.notification.IVfModuleMetadata; +import org.onap.sdc.utils.kafka.KafkaDataResponse; +import org.onap.sdc.utils.kafka.SdcKafkaConsumer; +import org.onap.sdc.utils.kafka.SdcKafkaProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.att.nsa.apiClient.credentials.ApiCredential; -import com.att.nsa.apiClient.http.HttpException; -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.cambria.client.CambriaClient.CambriaApiException; -import com.att.nsa.cambria.client.CambriaClientBuilders.AbstractAuthenticatedManagerBuilder; -import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; -import com.att.nsa.cambria.client.CambriaClientBuilders.IdentityManagerBuilder; -import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; -import com.att.nsa.cambria.client.CambriaConsumer; -import com.att.nsa.cambria.client.CambriaIdentityManager; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.reflect.TypeToken; - -import fj.data.Either; - public class DistributionClientImpl implements IDistributionClient { - private static final int POLLING_TIMEOUT_MULTIPLIER = 1000; private static final int TERMINATION_TIMEOUT = 60; private final Logger log; - private SdcConnectorClient asdcConnector; + private SdcConnectorClient sdcConnector; private ScheduledExecutorService executorPool = null; - protected CambriaIdentityManager cambriaIdentityManager = null; - private List<String> brokerServers; - private ApiCredential credential; + private SdcKafkaProducer producer; protected Configuration configuration; private INotificationCallback callback; private IStatusCallback statusCallback; - private String notificationTopic; - private String statusTopic; private boolean isConsumerGroupGenerated = false; private NotificationSender notificationSender; private final ConfigurationValidator configurationValidator = new ConfigurationValidator(); @@ -110,12 +93,69 @@ public class DistributionClientImpl implements IDistributionClient { } @Override + public synchronized IDistributionClientResult init(IConfiguration conf, INotificationCallback notificationCallback, IStatusCallback statusCallback) { + IDistributionClientResult initResult; + if (!conf.isConsumeProduceStatusTopic()) { + initResult = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG, + "configuration is invalid: isConsumeProduceStatusTopic() should be set to 'true'"); + + } else if (isNull(statusCallback)) { + initResult = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG, + "configuration is invalid: statusCallback is not defined"); + } else { + this.statusCallback = statusCallback; + initResult = init(conf, notificationCallback); + } + return initResult; + } + + @Override + public synchronized IDistributionClientResult init(IConfiguration conf, INotificationCallback callback) { + + log.info("DistributionClient - init"); + + Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); + validateNotInitilized(errorWrapper); + if (errorWrapper.isEmpty()) { + validateNotTerminated(errorWrapper); + } + if (errorWrapper.isEmpty()) { + this.configuration = validateAndInitConfiguration(errorWrapper, conf).getSecond(); + this.sdcConnector = createSdcConnector(configuration); + } + if (errorWrapper.isEmpty()) { + validateArtifactTypesWithSdcServer(conf, errorWrapper); + } + if (errorWrapper.isEmpty()) { + this.callback = callback; + } + if (errorWrapper.isEmpty()) { + initKafkaData(errorWrapper); + } + if (errorWrapper.isEmpty()) { + initKafkaProducer(errorWrapper, configuration); + } + if (errorWrapper.isEmpty()) { + this.notificationSender = new NotificationSender(producer); + } + IDistributionClientResult result; + if (errorWrapper.isEmpty()) { + isInitialized = true; + result = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, + "distribution client initialized successfully"); + } else { + result = errorWrapper.getInnerElement(); + } + + return result; + } + + @Override public IConfiguration getConfiguration() { return configuration; } @Override - /* see javadoc */ public synchronized IDistributionClientResult updateConfiguration(IConfiguration conf) { log.info("update DistributionClient configuration"); @@ -126,33 +166,34 @@ public class DistributionClientImpl implements IDistributionClient { return errorWrapper.getInnerElement(); } - IDistributionClientResult updateResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "configuration updated successfuly"); + IDistributionClientResult updateResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, + "configuration updated successfully"); - boolean needToUpdateCambriaConsumer = false; + boolean needToUpdateConsumer = false; if (conf.getRelevantArtifactTypes() != null && !conf.getRelevantArtifactTypes().isEmpty()) { configuration.setRelevantArtifactTypes(conf.getRelevantArtifactTypes()); - needToUpdateCambriaConsumer = true; + needToUpdateConsumer = true; } if (isPollingIntervalValid(conf.getPollingInterval())) { configuration.setPollingInterval(conf.getPollingInterval()); - needToUpdateCambriaConsumer = true; + needToUpdateConsumer = true; } if (isPollingTimeoutValid(conf.getPollingTimeout())) { configuration.setPollingTimeout(conf.getPollingTimeout()); - needToUpdateCambriaConsumer = true; + needToUpdateConsumer = true; } if (conf.getConsumerGroup() != null) { configuration.setConsumerGroup(conf.getConsumerGroup()); isConsumerGroupGenerated = false; - needToUpdateCambriaConsumer = true; + needToUpdateConsumer = true; } else if (!isConsumerGroupGenerated) { String generatedConsumerGroup = UUID.randomUUID().toString(); configuration.setConsumerGroup(generatedConsumerGroup); isConsumerGroupGenerated = true; } - if (needToUpdateCambriaConsumer) { + if (needToUpdateConsumer) { updateResult = restartConsumer(); } @@ -167,7 +208,7 @@ public class DistributionClientImpl implements IDistributionClient { log.info("start DistributionClient"); IDistributionClientResult startResult; - CambriaConsumer cambriaNotificationConsumer = null; + SdcKafkaConsumer kafkaConsumer = null; Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); validateRunReady(errorWrapper); if (errorWrapper.isEmpty()) { @@ -175,49 +216,49 @@ public class DistributionClientImpl implements IDistributionClient { } if (errorWrapper.isEmpty()) { try { - cambriaNotificationConsumer = new ConsumerBuilder().authenticatedBy(credential.getApiKey(), credential.getApiSecret()).knownAs(configuration.getConsumerGroup(), configuration.getConsumerID()).onTopic(notificationTopic).usingHttps(configuration.isUseHttpsWithDmaap()).usingHosts(brokerServers) - .withSocketTimeout(configuration.getPollingTimeout() * POLLING_TIMEOUT_MULTIPLIER).build(); - - } catch (MalformedURLException | GeneralSecurityException e) { - handleCambriaInitFailure(errorWrapper, e); + kafkaConsumer = new SdcKafkaConsumer(configuration); + kafkaConsumer.subscribe(configuration.getNotificationTopicName()); + } catch (KafkaException | IllegalArgumentException e) { + handleMessagingClientInitFailure(errorWrapper, e); } } if (errorWrapper.isEmpty()) { - - List<String> relevantArtifactTypes = configuration.getRelevantArtifactTypes(); - // Remove nulls from list - workaround for how configuration is built - relevantArtifactTypes.removeAll(Collections.singleton(null)); - NotificationConsumer consumer = new NotificationConsumer(cambriaNotificationConsumer, callback, relevantArtifactTypes, this); - executorPool = Executors.newScheduledThreadPool(DistributionClientConstants.POOL_SIZE); - executorPool.scheduleAtFixedRate(consumer, 0, configuration.getPollingInterval(), TimeUnit.SECONDS); - - handleStatusConsumer(errorWrapper, executorPool); + startNotificationConsumer(kafkaConsumer); + startStatusConsumer(errorWrapper, executorPool); } if (!errorWrapper.isEmpty()) { startResult = errorWrapper.getInnerElement(); } else { - startResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "distribution client started successfuly"); + startResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, + "distribution client started successfully"); isStarted = true; } return startResult; } - private void handleStatusConsumer(Wrapper<IDistributionClientResult> errorWrapper, ScheduledExecutorService executorPool) { + private void startNotificationConsumer(SdcKafkaConsumer kafkaConsumer) { + List<String> relevantArtifactTypes = configuration.getRelevantArtifactTypes(); + // Remove nulls from list - workaround for how configuration is built + relevantArtifactTypes.removeAll(Collections.singleton(null)); + NotificationConsumer consumer = new NotificationConsumer(kafkaConsumer, callback, relevantArtifactTypes, this); + executorPool = Executors.newScheduledThreadPool(DistributionClientConstants.POOL_SIZE); + executorPool.scheduleAtFixedRate(consumer, 0, configuration.getPollingInterval(), TimeUnit.SECONDS); + } + + private void startStatusConsumer(Wrapper<IDistributionClientResult> errorWrapper, ScheduledExecutorService executorPool) { if (configuration.isConsumeProduceStatusTopic()) { - CambriaConsumer cambriaStatusConsumer; try { - cambriaStatusConsumer = new ConsumerBuilder().authenticatedBy(credential.getApiKey(), credential.getApiSecret()).knownAs(configuration.getConsumerGroup(), configuration.getConsumerID()).onTopic(statusTopic).usingHttps(configuration.isUseHttpsWithDmaap()).usingHosts(brokerServers) - .withSocketTimeout(configuration.getPollingTimeout() * POLLING_TIMEOUT_MULTIPLIER).build(); - StatusConsumer statusConsumer = new StatusConsumer(cambriaStatusConsumer, statusCallback); + SdcKafkaConsumer kafkaConsumer = new SdcKafkaConsumer(configuration); + kafkaConsumer.subscribe(configuration.getStatusTopicName()); + StatusConsumer statusConsumer = new StatusConsumer(kafkaConsumer, statusCallback); executorPool.scheduleAtFixedRate(statusConsumer, 0, configuration.getPollingInterval(), TimeUnit.SECONDS); - } catch (MalformedURLException | GeneralSecurityException e) { - handleCambriaInitFailure(errorWrapper, e); + } catch (KafkaException | IllegalArgumentException e) { + handleMessagingClientInitFailure(errorWrapper, e); } } } @Override - /* see javadoc */ public synchronized IDistributionClientResult stop() { log.info("stop DistributionClient"); @@ -226,30 +267,13 @@ public class DistributionClientImpl implements IDistributionClient { if (!errorWrapper.isEmpty()) { return errorWrapper.getInnerElement(); } - // 1. stop polling notification topic shutdownExecutor(); - // 2. send to ASDC unregister to topic - IDistributionClientResult unregisterResult = asdcConnector.unregisterTopics(credential); - if (unregisterResult.getDistributionActionResult() != DistributionActionResultEnum.SUCCESS) { - log.info("client failed to unregister from topics"); - } else { - log.info("client unregistered from topics successfully"); - } - asdcConnector.close(); - - try { - cambriaIdentityManager.deleteCurrentApiKey(); - } catch (HttpException | IOException e) { - log.debug("failed to delete cambria keys", e); - } - cambriaIdentityManager.close(); - + sdcConnector.close(); isInitialized = false; isTerminated = true; - DistributionClientResultImpl stopResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "distribution client stopped successfuly"); - return stopResult; + return new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "distribution client stopped successfully"); } @Override @@ -259,167 +283,59 @@ public class DistributionClientImpl implements IDistributionClient { validateRunReady(errorWrapper); if (!errorWrapper.isEmpty()) { IDistributionClientResult result = errorWrapper.getInnerElement(); - IDistributionClientDownloadResult downloadResult = new DistributionClientDownloadResultImpl(result.getDistributionActionResult(), result.getDistributionMessageResult()); - return downloadResult; + return new DistributionClientDownloadResultImpl(result.getDistributionActionResult(), result.getDistributionMessageResult()); } - return asdcConnector.downloadArtifact(artifactInfo); + return sdcConnector.downloadArtifact(artifactInfo); } - @Override - public synchronized IDistributionClientResult init(IConfiguration conf, INotificationCallback notificationCallback, - IStatusCallback statusCallback) { - IDistributionClientResult initResult; - if (!conf.isConsumeProduceStatusTopic()) { - initResult = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG, "configuration is invalid: isConsumeProduceStatusTopic() should be set to 'true'"); - - } else if (isNull(statusCallback)) { - initResult = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG, "configuration is invalid: statusCallback is not defined"); - } else { - this.statusCallback = statusCallback; - initResult = init(conf, notificationCallback); - } - return initResult; + SdcConnectorClient createSdcConnector(Configuration configuration) { + return new SdcConnectorClient(configuration, new HttpSdcClient(configuration.getSdcAddress(), + new HttpClientFactory(configuration), + new HttpRequestFactory(configuration.getUser(), configuration.getPassword()))); } - @Override - /* - * see javadoc - */ - public synchronized IDistributionClientResult init(IConfiguration conf, INotificationCallback callback) { - - log.info("DistributionClient - init"); - - Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); - validateNotInitilized(errorWrapper); - if (errorWrapper.isEmpty()) { - validateNotTerminated(errorWrapper); - } - if (errorWrapper.isEmpty()) { - this.configuration = validateAndInitConfiguration(errorWrapper, conf).getSecond(); - this.asdcConnector = createAsdcConnector(this.configuration); - } - // 1. get ueb server list from configuration - if (errorWrapper.isEmpty()) { - List<String> servers = initUebServerList(errorWrapper); - if (servers != null) { - this.brokerServers = servers; - } - } - // 2.validate artifact types against asdc server - if (errorWrapper.isEmpty()) { - validateArtifactTypesWithAsdcServer(conf, errorWrapper); - } - // 3. create keys - if (errorWrapper.isEmpty()) { - this.callback = callback; - ApiCredential apiCredential = createUebKeys(errorWrapper); - if (apiCredential != null) { - this.credential = apiCredential; - } - } - // 4. register for topics - if (errorWrapper.isEmpty()) { - TopicRegistrationResponse topics = registerForTopics(errorWrapper, this.credential); - if (topics != null) { - this.notificationTopic = topics.getDistrNotificationTopicName(); - this.statusTopic = topics.getDistrStatusTopicName(); - this.notificationSender = createNotificationSender(); - } - } - - IDistributionClientResult result; - if (errorWrapper.isEmpty()) { - isInitialized = true; - result = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "distribution client initialized successfuly"); - } else { - result = errorWrapper.getInnerElement(); - } - - return result; - } - - SdcConnectorClient createAsdcConnector(Configuration configuration) { - return new SdcConnectorClient(configuration, new HttpAsdcClient(configuration)); - } - - private NotificationSender createNotificationSender() { - return new NotificationSender(brokerServers); - } - - private TopicRegistrationResponse registerForTopics(Wrapper<IDistributionClientResult> errorWrapper, ApiCredential credential) { - Either<TopicRegistrationResponse, DistributionClientResultImpl> registerAsdcTopics = asdcConnector.registerAsdcTopics(credential); - if (registerAsdcTopics.isRight()) { - - try { - cambriaIdentityManager.deleteCurrentApiKey(); - } catch (HttpException | IOException e) { - log.debug("failed to delete cambria keys", e); - } - errorWrapper.setInnerElement(registerAsdcTopics.right().value()); - } else { - return registerAsdcTopics.left().value(); - } - return null; - } - - private ApiCredential createUebKeys(Wrapper<IDistributionClientResult> errorWrapper) { - ApiCredential apiCredential = null; - - initCambriaClient(errorWrapper); - if (errorWrapper.isEmpty()) { - log.debug("create keys"); - Pair<DistributionClientResultImpl, ApiCredential> uebKeys = createUebKeys(); - DistributionClientResultImpl createKeysResponse = uebKeys.getFirst(); - apiCredential = uebKeys.getSecond(); - if (createKeysResponse.getDistributionActionResult() != DistributionActionResultEnum.SUCCESS) { - errorWrapper.setInnerElement(createKeysResponse); - } - } - return apiCredential; - } - - private void validateArtifactTypesWithAsdcServer(IConfiguration conf, Wrapper<IDistributionClientResult> errorWrapper) { - Either<List<String>, IDistributionClientResult> eitherValidArtifactTypesList = asdcConnector.getValidArtifactTypesList(); + private void validateArtifactTypesWithSdcServer(IConfiguration conf, Wrapper<IDistributionClientResult> errorWrapper) { + Either<List<String>, IDistributionClientResult> eitherValidArtifactTypesList = sdcConnector.getValidArtifactTypesList(); if (eitherValidArtifactTypesList.isRight()) { DistributionActionResultEnum errorType = eitherValidArtifactTypesList.right().value().getDistributionActionResult(); - // Support the case of a new client and older ASDC Server which does not have the API - if (errorType != DistributionActionResultEnum.ASDC_NOT_FOUND) { + // Support the case of a new client and older SDC Server which does not have the API + if (errorType != DistributionActionResultEnum.SDC_NOT_FOUND) { errorWrapper.setInnerElement(eitherValidArtifactTypesList.right().value()); } } else { - final List<String> artifactTypesFromAsdc = eitherValidArtifactTypesList.left().value(); - boolean isArtifactTypesValid = artifactTypesFromAsdc.containsAll(conf.getRelevantArtifactTypes()); + final List<String> artifactTypesFromSdc = eitherValidArtifactTypesList.left().value(); + boolean isArtifactTypesValid = artifactTypesFromSdc.containsAll(conf.getRelevantArtifactTypes()); if (!isArtifactTypesValid) { - List<String> invalidArtifactTypes = new ArrayList<>(); - invalidArtifactTypes.addAll(conf.getRelevantArtifactTypes()); - invalidArtifactTypes.removeAll(artifactTypesFromAsdc); + List<String> invalidArtifactTypes = new ArrayList<>(conf.getRelevantArtifactTypes()); + invalidArtifactTypes.removeAll(artifactTypesFromSdc); DistributionClientResultImpl errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.CONF_CONTAINS_INVALID_ARTIFACT_TYPES, - "configuration contains invalid artifact types:" + invalidArtifactTypes + " valid types are:" + artifactTypesFromAsdc); + "configuration contains invalid artifact types:" + invalidArtifactTypes + " valid types are:" + artifactTypesFromSdc); errorWrapper.setInnerElement(errorResponse); } else { - log.debug("Artifact types: {} were validated with ASDC server", conf.getRelevantArtifactTypes()); + log.debug("Artifact types: {} were validated with SDC server", conf.getRelevantArtifactTypes()); } } } - private List<String> initUebServerList(Wrapper<IDistributionClientResult> errorWrapper) { - List<String> brokerServers = null; - log.debug("get ueb cluster server list from component(configuration file)"); - - Either<List<String>, IDistributionClientResult> serverListResponse = getUEBServerList(); - if (serverListResponse.isRight()) { - errorWrapper.setInnerElement(serverListResponse.right().value()); + private void initKafkaData(Wrapper<IDistributionClientResult> errorWrapper) { + log.debug("Get MessageBus cluster information from SDC"); + Either<KafkaDataResponse, IDistributionClientResult> kafkaData = sdcConnector.getKafkaDistData(); + if (kafkaData.isRight()) { + errorWrapper.setInnerElement(kafkaData.right().value()); } else { - brokerServers = serverListResponse.left().value(); + KafkaDataResponse kafkaDataResponse = kafkaData.left().value(); + configuration.setMsgBusAddress(Collections.singletonList(kafkaDataResponse.getKafkaBootStrapServer())); + configuration.setNotificationTopicName(kafkaDataResponse.getDistrNotificationTopicName()); + configuration.setStatusTopicName(kafkaDataResponse.getDistrStatusTopicName()); + log.debug("MessageBus cluster info retrieved successfully {}", kafkaData.left().value()); } - - return brokerServers; } private void validateNotInitilized(Wrapper<IDistributionClientResult> errorWrapper) { if (isInitialized) { log.warn("distribution client already initialized"); - DistributionClientResultImpl alreadyInitResponse = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_ALREADY_INITIALIZED, "distribution client already initialized"); + DistributionClientResultImpl alreadyInitResponse = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_ALREADY_INITIALIZED, + "distribution client already initialized"); errorWrapper.setInnerElement(alreadyInitResponse); } } @@ -431,31 +347,17 @@ public class DistributionClientImpl implements IDistributionClient { } private IDistributionClientResult sendStatus(IDistributionStatusMessageJsonBuilder statusBuilder) { - IDistributionClientResult distributionResult; - - Either<CambriaBatchingPublisher, IDistributionClientResult> cambriaPublisher = getCambriaPublisher(statusTopic, configuration, brokerServers, credential); - if (cambriaPublisher.isRight()) { - distributionResult = cambriaPublisher.right().value(); - } else { - String statusMessage = statusBuilder.build(); - distributionResult = notificationSender.send(cambriaPublisher.left().value(), statusMessage); - } - - return distributionResult; + return notificationSender.send(configuration.getStatusTopicName(), statusBuilder.build()); } - private Either<CambriaBatchingPublisher, IDistributionClientResult> getCambriaPublisher(String statusTopic, Configuration configuration, List<String> brokerServers, ApiCredential credential) { - CambriaBatchingPublisher cambriaPublisher = null; + private void initKafkaProducer(Wrapper<IDistributionClientResult> errorWrapper, Configuration configuration) { try { - cambriaPublisher = new PublisherBuilder().onTopic(statusTopic).usingHttps(configuration.isUseHttpsWithDmaap()) - .usingHosts(brokerServers).build(); - cambriaPublisher.setApiCredentials(credential.getApiKey(), credential.getApiSecret()); - } catch (MalformedURLException | GeneralSecurityException e) { - Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); - handleCambriaInitFailure(errorWrapper, e); - return Either.right(errorWrapper.getInnerElement()); + if (producer == null) { + producer = new SdcKafkaProducer(configuration); + } + } catch (KafkaException | IllegalStateException e) { + handleMessagingClientInitFailure(errorWrapper, e); } - return Either.left(cambriaPublisher); } @Override @@ -471,27 +373,17 @@ public class DistributionClientImpl implements IDistributionClient { if (!errorWrapper.isEmpty()) { return errorWrapper.getInnerElement(); } - IDistributionStatusMessageJsonBuilder builder = DistributionStatusMessageJsonBuilderFactory.prepareBuilderForNotificationStatus(getConfiguration().getConsumerID(), currentTimeMillis, distributionId, artifactInfo, isNotified); + IDistributionStatusMessageJsonBuilder builder = DistributionStatusMessageJsonBuilderFactory.prepareBuilderForNotificationStatus( + getConfiguration().getConsumerID(), + currentTimeMillis, + distributionId, + artifactInfo, + isNotified); return sendStatus(builder); } /* *************************** Private Methods *************************************************** */ - protected Pair<DistributionClientResultImpl, ApiCredential> createUebKeys() { - DistributionClientResultImpl response = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "keys created successfuly"); - ApiCredential credential = null; - try { - String description = String.format(DistributionClientConstants.CLIENT_DESCRIPTION, configuration.getConsumerID()); - credential = cambriaIdentityManager.createApiKey(DistributionClientConstants.EMAIL, description); - cambriaIdentityManager.setApiCredentials(credential.getApiKey(), credential.getApiSecret()); - - } catch (HttpException | CambriaApiException | IOException e) { - response = new DistributionClientResultImpl(DistributionActionResultEnum.UEB_KEYS_CREATION_FAILED, "failed to create keys: " + e.getMessage()); - log.error(response.toString()); - } - return new Pair<>(response, credential); - } - private IDistributionClientResult restartConsumer() { shutdownExecutor(); return start(); @@ -500,43 +392,36 @@ public class DistributionClientImpl implements IDistributionClient { protected Pair<DistributionActionResultEnum, Configuration> validateAndInitConfiguration(Wrapper<IDistributionClientResult> errorWrapper, IConfiguration conf) { DistributionActionResultEnum result = configurationValidator.validateConfiguration(conf, statusCallback); - Configuration configuration = null; + Configuration configurationInit = null; if (result == DistributionActionResultEnum.SUCCESS) { - configuration = createConfiguration(conf); + configurationInit = createConfiguration(conf); } else { DistributionClientResultImpl initResult = new DistributionClientResultImpl(result, "configuration is invalid: " + result.name()); log.error(initResult.toString()); errorWrapper.setInnerElement(initResult); } - return new Pair<>(result, configuration); + return new Pair<>(result, configurationInit); } private Configuration createConfiguration(IConfiguration conf) { - Configuration configuration = new Configuration(conf); + Configuration configurationCreate = new Configuration(conf); if (!isPollingIntervalValid(conf.getPollingInterval())) { - configuration.setPollingInterval(DistributionClientConstants.MIN_POLLING_INTERVAL_SEC); + configurationCreate.setPollingInterval(DistributionClientConstants.MIN_POLLING_INTERVAL_SEC); } if (!isPollingTimeoutValid(conf.getPollingTimeout())) { - configuration.setPollingTimeout(DistributionClientConstants.POLLING_TIMEOUT_SEC); + configurationCreate.setPollingTimeout(DistributionClientConstants.POLLING_TIMEOUT_SEC); } if (conf.getConsumerGroup() == null) { String generatedConsumerGroup = UUID.randomUUID().toString(); - configuration.setConsumerGroup(generatedConsumerGroup); + configurationCreate.setConsumerGroup(generatedConsumerGroup); isConsumerGroupGenerated = true; } - //Default use HTTPS with SDC if (conf.isUseHttpsWithSDC() == null) { - configuration.setUseHttpsWithSDC(true); + configurationCreate.setUseHttpsWithSDC(true); } - - //Default use HTTPS with DMAAP - if (conf.isUseHttpsWithDmaap() == null) { - configuration.setUseHttpsWithDmaap(true); - } - - return configuration; + return configurationCreate; } private void shutdownExecutor() { @@ -577,7 +462,8 @@ public class DistributionClientImpl implements IDistributionClient { private void validateInitialized(Wrapper<IDistributionClientResult> errorWrapper) { if (!isInitialized) { log.debug("client was not initialized"); - IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_NOT_INITIALIZED, "distribution client was not initialized"); + IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_NOT_INITIALIZED, + "distribution client was not initialized"); errorWrapper.setInnerElement(result); } } @@ -585,7 +471,8 @@ public class DistributionClientImpl implements IDistributionClient { private void validateNotStarted(Wrapper<IDistributionClientResult> errorWrapper) { if (isStarted) { log.debug("client already started"); - IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_ALREADY_STARTED, "distribution client already started"); + IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_ALREADY_STARTED, + "distribution client already started"); errorWrapper.setInnerElement(result); } } @@ -593,7 +480,8 @@ public class DistributionClientImpl implements IDistributionClient { private void validateNotTerminated(Wrapper<IDistributionClientResult> errorWrapper) { if (isTerminated) { log.debug("client was terminated"); - IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_IS_TERMINATED, "distribution client was terminated"); + IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_IS_TERMINATED, + "distribution client was terminated"); errorWrapper.setInnerElement(result); } } @@ -616,23 +504,10 @@ public class DistributionClientImpl implements IDistributionClient { return isValid; } - private synchronized void initCambriaClient(Wrapper<IDistributionClientResult> errorWrapper) { - if (cambriaIdentityManager == null) { - try { - AbstractAuthenticatedManagerBuilder<CambriaIdentityManager> managerBuilder = new IdentityManagerBuilder().usingHosts(brokerServers); - if (configuration.isUseHttpsWithDmaap()) { - managerBuilder = managerBuilder.usingHttps(); - } - cambriaIdentityManager = managerBuilder.build(); - } catch (MalformedURLException | GeneralSecurityException e) { - handleCambriaInitFailure(errorWrapper, e); - } - } - } - private void handleCambriaInitFailure(Wrapper<IDistributionClientResult> errorWrapper, Exception e) { - final String errorMessage = "Failed initilizing cambria component:" + e.getMessage(); - IDistributionClientResult errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.CAMBRIA_INIT_FAILED, errorMessage); + private void handleMessagingClientInitFailure(Wrapper<IDistributionClientResult> errorWrapper, Exception e) { + final String errorMessage = "Failed initializing messaging component:" + e.getMessage(); + IDistributionClientResult errorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.MESSAGING_CLIENT_INIT_FAILED, errorMessage); errorWrapper.setInnerElement(errorResponse); log.error(errorMessage); log.debug(errorMessage, e); @@ -682,8 +557,7 @@ public class DistributionClientImpl implements IDistributionClient { String vfModuleJsonString = new String(artifactPayload, StandardCharsets.UTF_8); final Type type = new TypeToken<List<VfModuleMetadata>>() { }.getType(); - List<IVfModuleMetadata> vfModules = gson.fromJson(vfModuleJsonString, type); - return vfModules; + return gson.fromJson(vfModuleJsonString, type); } @@ -702,27 +576,12 @@ public class DistributionClientImpl implements IDistributionClient { } - - public Either<List<String>, IDistributionClientResult> getUEBServerList() { - List<String> msgBusAddresses = configuration.getMsgBusAddress(); - if (msgBusAddresses.isEmpty()) { - return Either.right(new DistributionClientResultImpl(DistributionActionResultEnum.CONF_MISSING_MSG_BUS_ADDRESS, "Message bus address was not found in the config file")); - } else if (getHttpProxyHost() == null && getHttpsProxyHost() == null) { - // If there is no proxy configured, convert to valid host name - return GeneralUtils.convertToValidHostName(msgBusAddresses); - } else { - // skip the IP address lookup when proxy is configured and treat all - // hosts as valid - return Either.left(msgBusAddresses); - } - } private HttpHost getHttpProxyHost() { HttpHost proxyHost = null; - if (configuration.isUseSystemProxy() && System.getProperty("http.proxyHost") != null - && System.getProperty("http.proxyPort") != null) { + if (Boolean.TRUE.equals(configuration.isUseSystemProxy() && System.getProperty("http.proxyHost") != null) && System.getProperty("http.proxyPort") != null) { proxyHost = new HttpHost(System.getProperty("http.proxyHost"), - Integer.valueOf(System.getProperty("http.proxyPort"))); + Integer.parseInt(System.getProperty("http.proxyPort"))); } else if (configuration.getHttpProxyHost() != null && configuration.getHttpProxyPort() != 0) { proxyHost = new HttpHost(configuration.getHttpProxyHost(), configuration.getHttpProxyPort()); } @@ -731,10 +590,9 @@ public class DistributionClientImpl implements IDistributionClient { private HttpHost getHttpsProxyHost() { HttpHost proxyHost = null; - if (configuration.isUseSystemProxy() && System.getProperty("https.proxyHost") != null - && System.getProperty("https.proxyPort") != null) { + if (configuration.isUseSystemProxy() && System.getProperty("https.proxyHost") != null && System.getProperty("https.proxyPort") != null) { proxyHost = new HttpHost(System.getProperty("https.proxyHost"), - Integer.valueOf(System.getProperty("https.proxyPort"))); + Integer.parseInt(System.getProperty("https.proxyPort"))); } else if (configuration.getHttpsProxyHost() != null && configuration.getHttpsProxyPort() != 0) { proxyHost = new HttpHost(configuration.getHttpsProxyHost(), configuration.getHttpsProxyPort()); } diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientResultImpl.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientResultImpl.java index 4edd355..77655ac 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientResultImpl.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientResultImpl.java @@ -25,8 +25,8 @@ import org.onap.sdc.utils.DistributionActionResultEnum; public class DistributionClientResultImpl implements IDistributionClientResult { - private DistributionActionResultEnum responseStatus; - private String responseMessage; + private final DistributionActionResultEnum responseStatus; + private final String responseMessage; public DistributionClientResultImpl(DistributionActionResultEnum responseStatus, String responseMessage) { this.responseStatus = responseStatus; diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java index bf28d97..c59612a 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java @@ -20,34 +20,32 @@ package org.onap.sdc.impl; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import java.util.ArrayList; import java.util.List; - +import org.onap.sdc.api.consumer.INotificationCallback; import org.onap.sdc.api.notification.IArtifactInfo; +import org.onap.sdc.api.notification.INotificationData; import org.onap.sdc.api.notification.IResourceInstance; import org.onap.sdc.api.results.IDistributionClientResult; -import org.onap.sdc.utils.DistributionActionResultEnum; -import org.onap.sdc.api.consumer.INotificationCallback; -import org.onap.sdc.api.notification.INotificationData; import org.onap.sdc.utils.ArtifactTypeEnum; +import org.onap.sdc.utils.DistributionActionResultEnum; +import org.onap.sdc.utils.kafka.SdcKafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.att.nsa.cambria.client.CambriaConsumer; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; - class NotificationConsumer implements Runnable { - private static Logger log = LoggerFactory.getLogger(NotificationConsumer.class.getName()); + private static final Logger log = LoggerFactory.getLogger(NotificationConsumer.class.getName()); - private CambriaConsumer cambriaConsumer; - private INotificationCallback clientCallback; - private List<String> artifactsTypes; - private DistributionClientImpl distributionClient; + private final SdcKafkaConsumer kafkaConsumer; + private final INotificationCallback clientCallback; + private final List<String> artifactsTypes; + private final DistributionClientImpl distributionClient; - NotificationConsumer(CambriaConsumer cambriaConsumer, INotificationCallback clientCallback, List<String> artifactsTypes, DistributionClientImpl distributionClient) { - this.cambriaConsumer = cambriaConsumer; + NotificationConsumer(SdcKafkaConsumer kafkaConsumer, INotificationCallback clientCallback, List<String> artifactsTypes, DistributionClientImpl distributionClient) { + this.kafkaConsumer = kafkaConsumer; this.clientCallback = clientCallback; this.artifactsTypes = artifactsTypes; this.distributionClient = distributionClient; @@ -55,16 +53,16 @@ class NotificationConsumer implements Runnable { @Override public void run() { - try { Gson gson = new GsonBuilder().setPrettyPrinting().create(); long currentTimeMillis = System.currentTimeMillis(); - for (String notificationMsg : cambriaConsumer.fetch()) { + log.info("Polling for messages from topic: {}", kafkaConsumer.getTopicName()); + for (String notificationMsg : kafkaConsumer.poll()) { log.debug("received message from topic"); - log.debug("recieved notification from broker: {}", notificationMsg); + log.debug("received notification from broker: {}", notificationMsg); - final NotificationDataImpl notificationFromUEB = gson.fromJson(notificationMsg, NotificationDataImpl.class); - NotificationDataImpl notificationForCallback = buildCallbackNotificationLogic(currentTimeMillis, notificationFromUEB); + final NotificationDataImpl notificationFromMessageBus = gson.fromJson(notificationMsg, NotificationDataImpl.class); + NotificationDataImpl notificationForCallback = buildCallbackNotificationLogic(currentTimeMillis, notificationFromMessageBus); if (isActivateCallback(notificationForCallback)) { String stringNotificationForCallback = gson.toJson(notificationForCallback); log.debug("sending notification to client: {}", stringNotificationForCallback); @@ -73,8 +71,8 @@ class NotificationConsumer implements Runnable { } } catch (Exception e) { - log.error("Error exception occured when fetching with Cambria Client:{}", e.getMessage()); - log.debug("Error exception occured when fetching with Cambria Client:{}", e.getMessage(), e); + log.error("Error exception occurred when fetching with Kafka Consumer:{}", e.getMessage()); + log.debug("Error exception occurred when fetching with Kafka Consumer:{}", e.getMessage(), e); } } @@ -85,21 +83,21 @@ class NotificationConsumer implements Runnable { return hasRelevantArtifactsInResourceInstance || hasRelevantArtifactsInService; } - protected NotificationDataImpl buildCallbackNotificationLogic(long currentTimeMillis, final NotificationDataImpl notificationFromUEB) { - List<IResourceInstance> relevantResourceInstances = buildResourceInstancesLogic(notificationFromUEB, currentTimeMillis); - List<ArtifactInfoImpl> relevantServiceArtifacts = handleRelevantArtifacts(notificationFromUEB, currentTimeMillis, notificationFromUEB.getServiceArtifactsImpl()); - notificationFromUEB.setResources(relevantResourceInstances); - notificationFromUEB.setServiceArtifacts(relevantServiceArtifacts); - return notificationFromUEB; + protected NotificationDataImpl buildCallbackNotificationLogic(long currentTimeMillis, final NotificationDataImpl notificationFromMessageBus) { + List<IResourceInstance> relevantResourceInstances = buildResourceInstancesLogic(notificationFromMessageBus, currentTimeMillis); + List<ArtifactInfoImpl> relevantServiceArtifacts = handleRelevantArtifacts(notificationFromMessageBus, currentTimeMillis, notificationFromMessageBus.getServiceArtifactsImpl()); + notificationFromMessageBus.setResources(relevantResourceInstances); + notificationFromMessageBus.setServiceArtifacts(relevantServiceArtifacts); + return notificationFromMessageBus; } - private List<IResourceInstance> buildResourceInstancesLogic(NotificationDataImpl notificationFromUEB, long currentTimeMillis) { + private List<IResourceInstance> buildResourceInstancesLogic(NotificationDataImpl notificationFromMessageBus, long currentTimeMillis) { List<IResourceInstance> relevantResourceInstances = new ArrayList<>(); - for (JsonContainerResourceInstance resourceInstance : notificationFromUEB.getResourcesImpl()) { + for (JsonContainerResourceInstance resourceInstance : notificationFromMessageBus.getResourcesImpl()) { final List<ArtifactInfoImpl> artifactsImplList = resourceInstance.getArtifactsImpl(); - List<ArtifactInfoImpl> foundRelevantArtifacts = handleRelevantArtifacts(notificationFromUEB, currentTimeMillis, artifactsImplList); + List<ArtifactInfoImpl> foundRelevantArtifacts = handleRelevantArtifacts(notificationFromMessageBus, currentTimeMillis, artifactsImplList); if (!foundRelevantArtifacts.isEmpty() || distributionClient.getConfiguration().isFilterInEmptyResources()) { resourceInstance.setArtifacts(foundRelevantArtifacts); relevantResourceInstances.add(resourceInstance); @@ -109,17 +107,17 @@ class NotificationConsumer implements Runnable { } - private List<ArtifactInfoImpl> handleRelevantArtifacts(NotificationDataImpl notificationFromUEB, long currentTimeMillis, final List<ArtifactInfoImpl> artifactsImplList) { + private List<ArtifactInfoImpl> handleRelevantArtifacts(NotificationDataImpl notificationFromMessageBus, long currentTimeMillis, final List<ArtifactInfoImpl> artifactsImplList) { List<ArtifactInfoImpl> relevantArtifacts = new ArrayList<>(); if (artifactsImplList != null) { for (ArtifactInfoImpl artifactInfo : artifactsImplList) { - handleRelevantArtifact(notificationFromUEB, currentTimeMillis, artifactsImplList, relevantArtifacts, artifactInfo); + handleRelevantArtifact(notificationFromMessageBus, currentTimeMillis, artifactsImplList, relevantArtifacts, artifactInfo); } } return relevantArtifacts; } - private void handleRelevantArtifact(NotificationDataImpl notificationFromUEB, long currentTimeMillis, final List<ArtifactInfoImpl> artifactsImplList, List<ArtifactInfoImpl> relevantArtifacts, ArtifactInfoImpl artifactInfo) { + private void handleRelevantArtifact(NotificationDataImpl notificationFromMessageBus, long currentTimeMillis, final List<ArtifactInfoImpl> artifactsImplList, List<ArtifactInfoImpl> relevantArtifacts, ArtifactInfoImpl artifactInfo) { boolean isArtifactRelevant = artifactsTypes.contains(artifactInfo.getArtifactType()); String artifactType = artifactInfo.getArtifactType(); if (artifactInfo.getGeneratedFromUUID() != null && !artifactInfo.getGeneratedFromUUID().isEmpty()) { @@ -131,16 +129,16 @@ class NotificationConsumer implements Runnable { } } if (isArtifactRelevant) { - setRelatedArtifacts(artifactInfo, notificationFromUEB); + setRelatedArtifacts(artifactInfo, notificationFromMessageBus); if (artifactType.equals(ArtifactTypeEnum.HEAT.name()) || artifactType.equals(ArtifactTypeEnum.HEAT_VOL.name()) || artifactType.equals(ArtifactTypeEnum.HEAT_NET.name())) { setGeneratedArtifact(artifactsImplList, artifactInfo); } relevantArtifacts.add(artifactInfo); } - IDistributionClientResult notificationStatus = distributionClient.sendNotificationStatus(currentTimeMillis, notificationFromUEB.getDistributionID(), artifactInfo, isArtifactRelevant); + IDistributionClientResult notificationStatus = distributionClient.sendNotificationStatus(currentTimeMillis, notificationFromMessageBus.getDistributionID(), artifactInfo, isArtifactRelevant); if (notificationStatus.getDistributionActionResult() != DistributionActionResultEnum.SUCCESS) { - log.error("Error failed to send notification status to UEB failed status:{}, error message:{}", notificationStatus.getDistributionActionResult().name(), notificationStatus.getDistributionMessageResult()); + log.error("Error failed to send notification status to MessageBus failed status:{}, error message:{}", notificationStatus.getDistributionActionResult().name(), notificationStatus.getDistributionMessageResult()); } } diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/StatusConsumer.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/StatusConsumer.java index 5951ed0..2c69330 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/StatusConsumer.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/StatusConsumer.java @@ -20,24 +20,23 @@ package org.onap.sdc.impl; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import org.onap.sdc.api.consumer.IStatusCallback; import org.onap.sdc.api.notification.IStatusData; +import org.onap.sdc.utils.kafka.SdcKafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.att.nsa.cambria.client.CambriaConsumer; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; - class StatusConsumer implements Runnable { - private static Logger log = LoggerFactory.getLogger(StatusConsumer.class.getName()); + private static final Logger log = LoggerFactory.getLogger(StatusConsumer.class.getName()); - private CambriaConsumer cambriaConsumer; - private IStatusCallback clientCallback; + private final SdcKafkaConsumer kafkaConsumer; + private final IStatusCallback clientCallback; - StatusConsumer(CambriaConsumer cambriaConsumer, IStatusCallback clientCallback) { - this.cambriaConsumer = cambriaConsumer; + StatusConsumer(SdcKafkaConsumer kafkaConsumer, IStatusCallback clientCallback) { + this.kafkaConsumer = kafkaConsumer; this.clientCallback = clientCallback; } @@ -46,18 +45,16 @@ class StatusConsumer implements Runnable { try { Gson gson = new GsonBuilder().setPrettyPrinting().create(); - for (String statusMsg : cambriaConsumer.fetch()) { + log.info("Polling for messages from topic: {}", kafkaConsumer.getTopicName()); + for (String statusMsg : kafkaConsumer.poll()) { log.debug("received message from topic"); - log.debug("recieved notification from broker: {}", statusMsg); + log.debug("received notification from broker: {}", statusMsg); IStatusData statusData = gson.fromJson(statusMsg, StatusDataImpl.class); clientCallback.activateCallback(statusData); - - } - } catch (Exception e) { - log.error("Error exception occured when fetching with Cambria Client:{}", e.getMessage()); - log.debug("Error exception occured when fetching with Cambria Client:{}", e.getMessage(), e); + log.error("Error exception occurred when fetching with Kafka Consumer:{}", e.getMessage()); + log.debug("Error exception occurred when fetching with Kafka Consumer:{}", e.getMessage(), e); } } |